Skip to content
Snippets Groups Projects
Commit 0119e0d3 authored by Sybren A. Stüvel's avatar Sybren A. Stüvel
Browse files

Worker: run a retry-loop when registration at manager fails.

This allows for any-order deployment of worker & manager.
parent ad98fae2
No related branches found
No related tags found
No related merge requests found
......@@ -10,6 +10,7 @@ from . import upstream
from . import upstream_update_queue
# All durations/delays/etc are in seconds.
REGISTER_AT_MANAGER_FAILED_RETRY_DELAY = 30
FETCH_TASK_FAILED_RETRY_DELAY = 10 # when we failed obtaining a task
FETCH_TASK_EMPTY_RETRY_DELAY = 5 # when there are no tasks to perform
FETCH_TASK_DONE_SCHEDULE_NEW_DELAY = 3 # after a task is completed
......@@ -96,18 +97,18 @@ class FlamencoWorker:
return None
return self.task_id
async def startup(self):
async def startup(self, *, may_retry_register=True):
self._log.info('Starting up')
if not self.worker_id or not self.worker_secret:
await self.register_at_manager()
await self.register_at_manager(may_retry_loop=may_retry_register)
# Once we know our ID and secret, update the manager object so that we
# don't have to pass our authentication info each and every call.
self.manager.auth = (self.worker_id, self.worker_secret)
self.schedule_fetch_task()
async def register_at_manager(self):
async def register_at_manager(self, *, may_retry_loop: bool):
import requests
self._log.info('Registering at manager')
......@@ -115,6 +116,7 @@ class FlamencoWorker:
self.worker_secret = generate_secret()
platform = detect_platform()
while True:
try:
resp = await self.manager.post(
'/register-worker',
......@@ -126,11 +128,18 @@ class FlamencoWorker:
auth=None, # explicitly do not use authentication
loop=self.loop,
)
except requests.ConnectionError:
self._log.error('Unable to register at manager, aborting.')
# TODO Sybren: implement a retry loop instead of aborting immediately.
resp.raise_for_status()
except requests.RequestException as ex:
if not may_retry_loop:
self._log.error('Unable to register at manager: %s', ex)
raise UnableToRegisterError()
self._log.warning('Unable to register at manager, retrying in %i seconds: %s',
REGISTER_AT_MANAGER_FAILED_RETRY_DELAY, ex)
await asyncio.sleep(REGISTER_AT_MANAGER_FAILED_RETRY_DELAY)
else:
break
resp.raise_for_status()
result = resp.json()
......
......@@ -5,13 +5,17 @@ import unittest
class CoroMockTest(unittest.TestCase):
def setUp(self):
from flamenco_worker.cli import construct_asyncio_loop
self.loop = construct_asyncio_loop()
def test_setting_return_value(self):
from mock_responses import CoroMock
cm = CoroMock()
cm.coro.return_value = '123'
result = asyncio.get_event_loop().run_until_complete(cm(3, 4))
result = self.loop.run_until_complete(cm(3, 4))
cm.assert_called_once_with(3, 4)
self.assertEqual('123', result)
......@@ -56,7 +56,7 @@ class WorkerStartupTest(AbstractFWorkerTest):
# Mock merge_with_home_config() so that it doesn't overwrite actual config.
@unittest.mock.patch('flamenco_worker.config.merge_with_home_config')
def test_startup_already_registered(self, mock_merge_with_home_config):
self.asyncio_loop.run_until_complete(self.worker.startup())
self.asyncio_loop.run_until_complete(self.worker.startup(may_retry_register=False))
mock_merge_with_home_config.assert_not_called() # Starting with known ID/secret
self.manager.post.assert_not_called()
self.tuqueue.queue.assert_not_called()
......@@ -72,7 +72,7 @@ class WorkerStartupTest(AbstractFWorkerTest):
'_id': '5555',
}))
self.asyncio_loop.run_until_complete(self.worker.startup())
self.asyncio_loop.run_until_complete(self.worker.startup(may_retry_register=False))
mock_merge_with_home_config.assert_called_once_with(
{'worker_id': '5555',
'worker_secret': self.worker.worker_secret}
......@@ -94,7 +94,7 @@ class WorkerStartupTest(AbstractFWorkerTest):
def test_startup_registration_unhappy(self, mock_merge_with_home_config):
"""Test that startup is aborted when the worker can't register."""
from flamenco_worker.worker import detect_platform
from flamenco_worker.worker import detect_platform, UnableToRegisterError
from mock_responses import JsonResponse, CoroMock
self.worker.worker_id = None
......@@ -104,9 +104,9 @@ class WorkerStartupTest(AbstractFWorkerTest):
}, status_code=500))
# Mock merge_with_home_config() so that it doesn't overwrite actual config.
self.assertRaises(requests.HTTPError,
self.assertRaises(UnableToRegisterError,
self.asyncio_loop.run_until_complete,
self.worker.startup())
self.worker.startup(may_retry_register=False))
mock_merge_with_home_config.assert_not_called()
assert isinstance(self.manager.post, unittest.mock.Mock)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment