diff --git a/CHANGELOG.md b/CHANGELOG.md index 963293c49377ab76eca24ce5d6fdd95b3d1fefbc..473d52686efbfb504d55dcb3d07c10a4477bb077 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ This file logs the changes that are actually interesting to users (new features, changed functionality, fixed bugs). + ## Version 2.3 (in development) - Changed how progressive rendering works. Nonuniform tasks are now supported. This requires @@ -12,6 +13,8 @@ changed functionality, fixed bugs). EXR files to JPEG files. This is used in progressive rendering to get intermediary previews. - Added the `merge_progressive_render_sequence` for sample-merging sequences of EXR files. The already-existing `merge_progressive_renders` command only performed on one frame at a time. +- The Worker now automatically re-registers when the Manager does not accept its credentials. + This makes it easier to handle erasure of the Manager's database. ## Version 2.2.1 (2019-01-14) diff --git a/flamenco_worker/worker.py b/flamenco_worker/worker.py index 65943fd3b354f1763a098224fd51f5a5acfc7e52..ed48966ba45e884009aacc7fce88264eec27b446 100644 --- a/flamenco_worker/worker.py +++ b/flamenco_worker/worker.py @@ -213,9 +213,9 @@ class FlamencoWorker: resp = await self.manager.post(url, **post_kwargs) resp.raise_for_status() except requests.RequestException as ex: - if not may_retry_loop: - self._log.error('Unable to POST to manager %s: %s', url, ex) - raise UnableToRegisterError() + if not may_retry_loop or ex.response.status_code == 401: + self._log.debug('Unable to POST to manager %s: %s', url, ex) + raise self._log.warning('Unable to POST to manager %s, retrying in %i seconds: %s', url, REGISTER_AT_MANAGER_FAILED_RETRY_DELAY, ex) @@ -223,22 +223,42 @@ class FlamencoWorker: else: return resp - async def signon(self, *, may_retry_loop: bool): + async def signon(self, *, may_retry_loop: bool, + autoregister_already_tried: bool=False): """Signs on at the manager. Only needed when we didn't just register. """ self._log.info('Signing on at manager.') - await self._keep_posting_to_manager( - '/sign-on', - json={ - 'supported_task_types': self.task_types, - 'nickname': self.hostname(), - }, - may_retry_loop=may_retry_loop, - ) - self._log.info('Manager accepted sign-on.') + try: + await self._keep_posting_to_manager( + '/sign-on', + json={ + 'supported_task_types': self.task_types, + 'nickname': self.hostname(), + }, + may_retry_loop=may_retry_loop, + ) + except requests.exceptions.HTTPError as ex: + if ex.response.status_code != 401: + self._log.error('Unable to sign on at Manager: %s', ex) + raise UnableToRegisterError() + + if autoregister_already_tried: + self._log.error('Manager did not accept our credentials, and re-registration ' + 'was already attempted. Giving up.') + raise UnableToRegisterError() + + self._log.warning('Manager did not accept our credentials, going to re-register') + await self.register_at_manager(may_retry_loop=may_retry_loop) + + self._log.warning('Re-registration was fine, going to re-try sign-on') + await self.signon(may_retry_loop=may_retry_loop, autoregister_already_tried=True) + else: + # Expected flow: no exception, manager accepts credentials. + self._log.info('Manager accepted sign-on.') + async def register_at_manager(self, *, may_retry_loop: bool): self._log.info('Registering at manager') @@ -261,6 +281,7 @@ class FlamencoWorker: result = resp.json() self._log.info('Response: %s', result) self.worker_id = result['_id'] + self.manager.auth = (self.worker_id, self.worker_secret) self.write_registration_info() diff --git a/tests/mock_responses.py b/tests/mock_responses.py index e8bb6b2e1bb1829e34633b2fe5e8ce0617d58263..6627c6d477d9da7a072d7db1f44ec0db9381d146 100644 --- a/tests/mock_responses.py +++ b/tests/mock_responses.py @@ -55,7 +55,7 @@ class EmptyResponse: pass -def CoroMock(return_value=None): +def CoroMock(return_value=None, side_effect=...): """Corountine mocking object. For an example, see test_coro_mock.py. @@ -63,8 +63,12 @@ def CoroMock(return_value=None): Source: http://stackoverflow.com/a/32505333/875379 :param return_value: whatever you want to have set as return value. - This must always be set. Pass the ellipsis object ... to not set this; in that case - you are responsible yourself to set coromock.coro.return_value. + :param side_effect: whatever you want to have set as mock side-effect. + + Either return_value or side_effect must always be set. Pass the ellipsis + object to either parameter ... to not set them. When passing ellipsis to + both parameters you are responsible yourself to set + coromock.coro.return_value or coromock.coro.side_effect. """ import asyncio @@ -76,5 +80,7 @@ def CoroMock(return_value=None): if return_value is not ...: corofunc.coro.return_value = return_value + if side_effect is not ...: + corofunc.coro.side_effect = side_effect return corofunc diff --git a/tests/test_coro_mock.py b/tests/test_coro_mock.py index 16a1364be8e7ed4dbdb83f08a998eae9e3b5de9b..af8ec947b5cb715c8b1505db8255d9d32a8f14a6 100644 --- a/tests/test_coro_mock.py +++ b/tests/test_coro_mock.py @@ -19,3 +19,21 @@ class CoroMockTest(unittest.TestCase): cm.assert_called_once_with(3, 4) self.assertEqual('123', result) + + def test_setting_side_effect(self): + from tests.mock_responses import CoroMock + + cm = CoroMock() + cm.coro.side_effect = ['123', '456', IOError('oops')] + + self.assertEqual('123', self.loop.run_until_complete(cm(3, 4))) + self.assertEqual('456', self.loop.run_until_complete(cm(3, 4))) + + with self.assertRaises(IOError): + self.loop.run_until_complete(cm(3, 4)) + + # A generator is not allowed to raise StopIteration by itself, + # so the StopIteration caused by side_effect being exhausted + # results in a RuntimeError. + with self.assertRaises(RuntimeError): + self.loop.run_until_complete(cm(3, 4)) diff --git a/tests/test_worker.py b/tests/test_worker.py index 7a8b84b2133b19cce4ec318af80ef8a5489a338b..5efb8db221dbeb6207a0c331c4d06a8b68647c41 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -1,7 +1,7 @@ import concurrent.futures import unittest import unittest.mock -from unittest.mock import Mock +from unittest.mock import Mock, call import asyncio import requests @@ -157,6 +157,58 @@ class WorkerStartupTest(AbstractFWorkerTest): loop=self.asyncio_loop, ) + # Mock merge_with_home_config() so that it doesn't overwrite actual config. + @unittest.mock.patch('flamenco_worker.config.merge_with_home_config') + def test_reregister_if_forbidden(self, mock_merge_with_home_config): + from tests.mock_responses import CoroMock, EmptyResponse, JsonResponse, TextResponse + from flamenco_worker.worker import detect_platform + + self.manager.post = CoroMock(side_effect=[ + # First sign-on fails: + requests.exceptions.HTTPError( + response=TextResponse(text='401 Unauthorized', status_code=401)), + # Automatic re-register response: + JsonResponse({'_id': '47327'}), + # Subsequent sign-on is OK: + EmptyResponse(), + ]) + + self.assertEqual(self.worker.worker_id, '1234') + old_worker_secret = self.worker.worker_secret + + self.asyncio_loop.run_until_complete(self.worker.startup(may_retry_loop=False)) + + mock_merge_with_home_config.assert_called_once_with( + {'worker_id': '47327', + 'worker_secret': self.worker.worker_secret}) + + self.assertEqual(self.worker.worker_id, '47327') + self.assertNotEqual(old_worker_secret, self.worker.worker_secret) + self.assertEqual(('47327', self.worker.worker_secret), self.worker.manager.auth) + + self.manager.post.assert_has_calls([ + call('/sign-on', + json={ + 'supported_task_types': ['sleep', 'unittest'], + 'nickname': 'ws-unittest', + }, + loop=self.asyncio_loop), + call('/register-worker', + json={'secret': self.worker.worker_secret, + 'platform': detect_platform(), + 'supported_task_types': ['sleep', 'unittest'], + 'nickname': 'ws-unittest'}, + auth=None, + loop=self.asyncio_loop), + call('/sign-on', + json={ + 'supported_task_types': ['sleep', 'unittest'], + 'nickname': 'ws-unittest', + }, + loop=self.asyncio_loop), + ]) + self.tuqueue.queue.assert_not_called() + class TestWorkerTaskExecution(AbstractFWorkerTest): def setUp(self):