diff --git a/flamenco_worker/worker.py b/flamenco_worker/worker.py index 1d67ee8c8fc189c997b32ca2a5a6f8dc04b06a74..f8c46ac12742767defddc031f13da1fd622c7c57 100644 --- a/flamenco_worker/worker.py +++ b/flamenco_worker/worker.py @@ -104,51 +104,89 @@ class FlamencoWorker: return None return self.task_id - async def startup(self, *, may_retry_register=True): + async def startup(self, *, may_retry_loop=True): self._log.info('Starting up') - if not self.worker_id or not self.worker_secret: - await self.register_at_manager(may_retry_loop=may_retry_register) + do_register = not self.worker_id or not self.worker_secret + if do_register: + await self.register_at_manager(may_retry_loop=may_retry_loop) # Once we know our ID and secret, update the manager object so that we # don't have to pass our authentication info each and every call. self.manager.auth = (self.worker_id, self.worker_secret) + + # We only need to sign on if we didn't just register. However, this + # can only happen after setting self.manager.auth. + if not do_register: + await self.signon(may_retry_loop=may_retry_loop) + self.schedule_fetch_task() - async def register_at_manager(self, *, may_retry_loop: bool): - import requests + @staticmethod + def hostname() -> str: import socket + return socket.gethostname() - self._log.info('Registering at manager') + async def _keep_posting_to_manager(self, url: str, json: dict, *, use_auth=True, + may_retry_loop: bool): + import requests - self.worker_secret = generate_secret() - platform = detect_platform() - hostname = socket.gethostname() + post_kwargs = { + 'json': json, + 'loop': self.loop, + } + if not use_auth: + post_kwargs['auth'] = None while True: try: - resp = await self.manager.post( - '/register-worker', - json={ - 'secret': self.worker_secret, - 'platform': platform, - 'supported_task_types': self.task_types, - 'nickname': hostname, - }, - auth=None, # explicitly do not use authentication - loop=self.loop, - ) + resp = await self.manager.post(url, **post_kwargs) resp.raise_for_status() except requests.RequestException as ex: if not may_retry_loop: - self._log.error('Unable to register at manager: %s', ex) + self._log.error('Unable to POST to manager %s: %s', url, ex) raise UnableToRegisterError() - self._log.warning('Unable to register at manager, retrying in %i seconds: %s', - REGISTER_AT_MANAGER_FAILED_RETRY_DELAY, ex) + self._log.warning('Unable to POST to manager %s, retrying in %i seconds: %s', + url, REGISTER_AT_MANAGER_FAILED_RETRY_DELAY, ex) await asyncio.sleep(REGISTER_AT_MANAGER_FAILED_RETRY_DELAY) else: - break + return resp + + async def signon(self, *, may_retry_loop: bool): + """Signs on at the manager. + + Only needed when we didn't just register. + """ + + self._log.info('Signing on at manager.') + await self._keep_posting_to_manager( + '/sign-on', + json={ + 'supported_task_types': self.task_types, + 'nickname': self.hostname(), + }, + may_retry_loop=may_retry_loop, + ) + self._log.info('Manager accepted sign-on.') + + async def register_at_manager(self, *, may_retry_loop: bool): + self._log.info('Registering at manager') + + self.worker_secret = generate_secret() + platform = detect_platform() + + resp = await self._keep_posting_to_manager( + '/register-worker', + json={ + 'secret': self.worker_secret, + 'platform': platform, + 'supported_task_types': self.task_types, + 'nickname': self.hostname(), + }, + use_auth=False, # explicitly do not use authentication + may_retry_loop=may_retry_loop, + ) result = resp.json() self._log.info('Response: %s', result) diff --git a/tests/test_worker.py b/tests/test_worker.py index c9f3b926db7a55ecac0e3392188c462c0e606c65..330d38dd5b7c3e43ca1999dd48112286e3161f84 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -56,11 +56,25 @@ class AbstractFWorkerTest(AbstractWorkerTest): class WorkerStartupTest(AbstractFWorkerTest): # Mock merge_with_home_config() so that it doesn't overwrite actual config. + @unittest.mock.patch('socket.gethostname') @unittest.mock.patch('flamenco_worker.config.merge_with_home_config') - def test_startup_already_registered(self, mock_merge_with_home_config): - self.asyncio_loop.run_until_complete(self.worker.startup(may_retry_register=False)) + def test_startup_already_registered(self, mock_merge_with_home_config, mock_gethostname): + + from mock_responses import EmptyResponse, CoroMock + + mock_gethostname.return_value = 'ws-unittest' + self.manager.post = CoroMock(return_value=EmptyResponse()) + + self.asyncio_loop.run_until_complete(self.worker.startup(may_retry_loop=False)) mock_merge_with_home_config.assert_not_called() # Starting with known ID/secret - self.manager.post.assert_not_called() + self.manager.post.assert_called_once_with( + '/sign-on', + json={ + 'supported_task_types': ['sleep', 'unittest'], + 'nickname': 'ws-unittest', + }, + loop=self.asyncio_loop, + ) self.tuqueue.queue.assert_not_called() @unittest.mock.patch('socket.gethostname') @@ -76,7 +90,7 @@ class WorkerStartupTest(AbstractFWorkerTest): '_id': '5555', })) - self.asyncio_loop.run_until_complete(self.worker.startup(may_retry_register=False)) + self.asyncio_loop.run_until_complete(self.worker.startup(may_retry_loop=False)) mock_merge_with_home_config.assert_called_once_with( {'worker_id': '5555', 'worker_secret': self.worker.worker_secret} @@ -113,7 +127,7 @@ class WorkerStartupTest(AbstractFWorkerTest): # Mock merge_with_home_config() so that it doesn't overwrite actual config. self.assertRaises(UnableToRegisterError, self.asyncio_loop.run_until_complete, - self.worker.startup(may_retry_register=False)) + self.worker.startup(may_retry_loop=False)) mock_merge_with_home_config.assert_not_called() assert isinstance(self.manager.post, unittest.mock.Mock)