diff --git a/packages/flamenco-worker-python/flamenco_worker/cli.py b/packages/flamenco-worker-python/flamenco_worker/cli.py index 900965bdaa569b2f42228719fa56b3a810045612..90e499c7aad439176186d255c54ada8098e3ad5d 100644 --- a/packages/flamenco-worker-python/flamenco_worker/cli.py +++ b/packages/flamenco-worker-python/flamenco_worker/cli.py @@ -90,7 +90,7 @@ def main(): asyncio.ensure_future(tuqueue.work(loop=loop)) try: - fworker.startup() + loop.run_until_complete(fworker.startup()) fworker.mainloop() except worker.UnableToRegisterError: # The worker will have logged something, we'll just shut down cleanly. diff --git a/packages/flamenco-worker-python/flamenco_worker/upstream.py b/packages/flamenco-worker-python/flamenco_worker/upstream.py index 51544f800a2f079b68f820a2cd549397d3fe2813..2b63aa2177c69502cccbdf88ea888593163660e4 100644 --- a/packages/flamenco-worker-python/flamenco_worker/upstream.py +++ b/packages/flamenco-worker-python/flamenco_worker/upstream.py @@ -1,4 +1,5 @@ import attr +import concurrent.futures import requests from . import attrs_extra @@ -9,51 +10,55 @@ HTTP_TIMEOUT = 3 # in seconds @attr.s class FlamencoManager: - # TODO Sybren: make all functions async manager_url = attr.ib(validator=attr.validators.instance_of(str)) session = attr.ib(default=None, init=False) auth = attr.ib(default=None, init=False) # tuple (worker_id, worker_secret) + # Executor for HTTP requests, so that they can run in separate threads. + _executor = attr.ib(default=attr.Factory(concurrent.futures.ThreadPoolExecutor), + init=False) _log = attrs_extra.log('%s.FlamencoManager' % __name__) - def get(self, *args, **kwargs) -> requests.Response: - return self.client_request('GET', *args, **kwargs) - - def post(self, *args, **kwargs) -> requests.Response: - return self.client_request('POST', *args, **kwargs) - - def put(self, *args, **kwargs) -> requests.Response: - return self.client_request('PUT', *args, **kwargs) - - def delete(self, *args, **kwargs) -> requests.Response: - return self.client_request('DELETE', *args, **kwargs) - - def patch(self, *args, **kwargs) -> requests.Response: - return self.client_request('PATCH', *args, **kwargs) - - def client_request(self, method, url, - params=None, - data=None, - headers=None, - cookies=None, - files=None, - auth=..., - timeout=HTTP_TIMEOUT, - allow_redirects=True, - proxies=None, - hooks=None, - stream=None, - verify=None, - cert=None, - json=None) -> requests.Response: + async def get(self, *args, loop, **kwargs) -> requests.Response: + return await self.client_request('GET', *args, loop=loop, **kwargs) + + async def post(self, *args, loop, **kwargs) -> requests.Response: + return await self.client_request('POST', *args, loop=loop, **kwargs) + + async def put(self, *args, loop, **kwargs) -> requests.Response: + return await self.client_request('PUT', *args, loop=loop, **kwargs) + + async def delete(self, *args, loop, **kwargs) -> requests.Response: + return await self.client_request('DELETE', *args, loop=loop, **kwargs) + + async def patch(self, *args, loop, **kwargs) -> requests.Response: + return await self.client_request('PATCH', *args, loop=loop, **kwargs) + + async def client_request(self, method, url, *, + params=None, + data=None, + headers=None, + cookies=None, + files=None, + auth=..., + timeout=HTTP_TIMEOUT, + allow_redirects=True, + proxies=None, + hooks=None, + stream=None, + verify=None, + cert=None, + json=None, + loop) -> requests.Response: """Performs a HTTP request to the server. Creates and re-uses the HTTP session, to have efficient communication. - if 'auth=...' (the default), self.auth is used. If 'auth=None', no authentication is used. + if 'auth=...' (the async default), self.auth is used. If 'auth=None', no authentication is used. """ import urllib.parse + from functools import partial if not self.session: from requests.adapters import HTTPAdapter @@ -65,21 +70,23 @@ class FlamencoManager: abs_url = urllib.parse.urljoin(self.manager_url, url) self._log.debug('%s %s JSON: %s', method, abs_url, json) - resp = self.session.request( - method, abs_url, - params=params, - data=data, - headers=headers, - cookies=cookies, - files=files, - auth=self.auth if auth is ... else auth, - timeout=timeout, - allow_redirects=allow_redirects, - proxies=proxies, - hooks=hooks, - stream=stream, - verify=verify, - cert=cert, - json=json) + http_req = partial(self.session.request, + method, abs_url, + params=params, + data=data, + headers=headers, + cookies=cookies, + files=files, + auth=self.auth if auth is ... else auth, + timeout=timeout, + allow_redirects=allow_redirects, + proxies=proxies, + hooks=hooks, + stream=stream, + verify=verify, + cert=cert, + json=json) + + resp = await loop.run_in_executor(self._executor, http_req) return resp diff --git a/packages/flamenco-worker-python/flamenco_worker/upstream_update_queue.py b/packages/flamenco-worker-python/flamenco_worker/upstream_update_queue.py index b424621f16cec422b1d09be40de3d9e2c2375478..599e30cb15eca3f8651b3fcca3d5a0f39c374e84 100644 --- a/packages/flamenco-worker-python/flamenco_worker/upstream_update_queue.py +++ b/packages/flamenco-worker-python/flamenco_worker/upstream_update_queue.py @@ -85,7 +85,7 @@ class TaskUpdateQueue: queue_is_empty = False self._log.info('Pushing task update to Manager') - resp = self.manager.post(url, json=payload) + resp = await self.manager.post(url, json=payload, loop=loop) resp.raise_for_status() self._log.debug('Master accepted pushed update.') self._unqueue(rowid) diff --git a/packages/flamenco-worker-python/flamenco_worker/worker.py b/packages/flamenco-worker-python/flamenco_worker/worker.py index 696b423d2c1d9dad1b4056042d0b2eacf505e004..81fd281cb2b3df9204294eade8c4195bd2e8f118 100644 --- a/packages/flamenco-worker-python/flamenco_worker/worker.py +++ b/packages/flamenco-worker-python/flamenco_worker/worker.py @@ -65,18 +65,18 @@ class FlamencoWorker: _log = attrs_extra.log('%s.FlamencoWorker' % __name__) - def startup(self): + async def startup(self): self._log.info('Starting up') if not self.worker_id or not self.worker_secret: - self.register_at_manager() + await self.register_at_manager() # Once we know our ID and secret, update the manager object so that we # don't have to pass our authentication info each and every call. self.manager.auth = (self.worker_id, self.worker_secret) self.schedule_fetch_task() - def register_at_manager(self): + async def register_at_manager(self): import requests self._log.info('Registering at manager') @@ -85,7 +85,7 @@ class FlamencoWorker: platform = detect_platform() try: - resp = self.manager.post( + resp = await self.manager.post( '/register-worker', json={ 'secret': self.worker_secret, @@ -93,6 +93,7 @@ class FlamencoWorker: 'supported_job_types': self.job_types, }, auth=None, # explicitly do not use authentication + loop=self.loop, ) except requests.ConnectionError: self._log.error('Unable to register at manager, aborting.') @@ -173,7 +174,7 @@ class FlamencoWorker: # TODO: use exponential backoff instead of retrying every fixed N seconds. self._log.info('Fetching task') try: - resp = self.manager.post('/task') + resp = await self.manager.post('/task', loop=self.loop) except requests.exceptions.RequestException as ex: self._log.warning('Error fetching new task, will retry in %i seconds: %s', FETCH_TASK_FAILED_RETRY_DELAY, ex) diff --git a/packages/flamenco-worker-python/tests/mock_responses.py b/packages/flamenco-worker-python/tests/mock_responses.py index 1fcaa45704ecc2b44c0f1c95910aff6f5181e6b8..ca9c18e93ff6fd2fbb021cffd47a64cb6ed1c587 100644 --- a/packages/flamenco-worker-python/tests/mock_responses.py +++ b/packages/flamenco-worker-python/tests/mock_responses.py @@ -35,3 +35,24 @@ class EmptyResponse: def raise_for_status(self): pass + + +def CoroMock(return_value=...): + """Corountine mocking object. + + For an example, see test_coro_mock.py. + + Source: http://stackoverflow.com/a/32505333/875379 + """ + + import asyncio + from unittest.mock import Mock + + coro = Mock(name="CoroutineResult") + corofunc = Mock(name="CoroutineFunction", side_effect=asyncio.coroutine(coro)) + corofunc.coro = coro + + if return_value is not ...: + corofunc.coro.return_value = return_value + + return corofunc diff --git a/packages/flamenco-worker-python/tests/test_coro_mock.py b/packages/flamenco-worker-python/tests/test_coro_mock.py new file mode 100644 index 0000000000000000000000000000000000000000..34bd5c1d944aa1ef1e73bf8bcefebf89befcf146 --- /dev/null +++ b/packages/flamenco-worker-python/tests/test_coro_mock.py @@ -0,0 +1,17 @@ +"""Unit test for our CoroMock implementation.""" + +import asyncio +import unittest + + +class CoroMockTest(unittest.TestCase): + def test_setting_return_value(self): + from mock_responses import CoroMock + + cm = CoroMock() + cm.coro.return_value = '123' + + result = asyncio.get_event_loop().run_until_complete(cm(3, 4)) + + cm.assert_called_once_with(3, 4) + self.assertEqual('123', result) diff --git a/packages/flamenco-worker-python/tests/test_upstream_update_queue.py b/packages/flamenco-worker-python/tests/test_upstream_update_queue.py index 22437153db996744fce8c513b5104e038fb9a728..6f7536c271b805feb0b1b692e9e98b9dfbd4e754 100644 --- a/packages/flamenco-worker-python/tests/test_upstream_update_queue.py +++ b/packages/flamenco-worker-python/tests/test_upstream_update_queue.py @@ -13,12 +13,13 @@ class TaskUpdateQueueTest(AbstractWorkerTest): def setUp(self): from flamenco_worker.upstream import FlamencoManager from flamenco_worker.upstream_update_queue import TaskUpdateQueue + from mock_responses import CoroMock self.asyncio_loop = asyncio.get_event_loop() self.shutdown_future = self.asyncio_loop.create_future() self.manager = Mock(spec=FlamencoManager) - self.manager.post = Mock() + self.manager.post = CoroMock() self.tmpdir = tempfile.TemporaryDirectory() self.tuqueue = TaskUpdateQueue( @@ -47,11 +48,13 @@ class TaskUpdateQueueTest(AbstractWorkerTest): tries = 0 received_payload = None received_url = None + received_loop = None - def push_callback(url, *, json): + async def push_callback(url, *, json, loop): nonlocal tries nonlocal received_url nonlocal received_payload + nonlocal received_loop tries += 1 if tries < 3: @@ -66,6 +69,8 @@ class TaskUpdateQueueTest(AbstractWorkerTest): # since the work loop is designed to keep running, even when exceptions are thrown. received_url = url received_payload = copy.deepcopy(json) + received_loop = loop + return EmptyResponse() self.manager.post.side_effect = push_callback @@ -84,6 +89,7 @@ class TaskUpdateQueueTest(AbstractWorkerTest): # Check the payload. self.assertEqual(received_url, '/push/here') self.assertEqual(received_payload, payload) + self.assertEqual(received_loop, self.asyncio_loop) def test_queue_persistence(self): """Check that updates are pushed, even when the process is stopped & restarted.""" @@ -110,16 +116,19 @@ class TaskUpdateQueueTest(AbstractWorkerTest): received_payload = None received_url = None + received_loop = None - def push_callback(url, *, json): + async def push_callback(url, *, json, loop): nonlocal received_url nonlocal received_payload + nonlocal received_loop # Shut down after handling this push. self.shutdown_future.cancel() received_url = url received_payload = copy.deepcopy(json) + received_loop = loop return EmptyResponse() self.manager.post.side_effect = push_callback @@ -135,3 +144,4 @@ class TaskUpdateQueueTest(AbstractWorkerTest): # Check the payload self.assertEqual(received_url, '/push/there') self.assertEqual(received_payload, payload) + self.assertEqual(received_loop, self.asyncio_loop) diff --git a/packages/flamenco-worker-python/tests/test_worker.py b/packages/flamenco-worker-python/tests/test_worker.py index 8ab5c456b2c8407b4489f30cb2ea8d781d4e454f..228bef28e69ad9aa146d4f9f6a212b6bb5a3e548 100644 --- a/packages/flamenco-worker-python/tests/test_worker.py +++ b/packages/flamenco-worker-python/tests/test_worker.py @@ -44,24 +44,30 @@ class AbstractWorkerTest(unittest.TestCase): class WorkerStartupTest(AbstractWorkerTest): - def test_startup_already_registered(self): - self.worker.startup() + # Mock merge_with_home_config() so that it doesn't overwrite actual config. + @unittest.mock.patch('flamenco_worker.config.merge_with_home_config') + def test_startup_already_registered(self, mock_merge_with_home_config): + self.asyncio_loop.run_until_complete(self.worker.startup()) + mock_merge_with_home_config.assert_not_called() # Starting with known ID/secret self.manager.post.assert_not_called() self.tuqueue.queue.assert_not_called() - def test_startup_registration(self): + @unittest.mock.patch('flamenco_worker.config.merge_with_home_config') + def test_startup_registration(self, mock_merge_with_home_config): from flamenco_worker.worker import detect_platform - from mock_responses import JsonResponse + from mock_responses import JsonResponse, CoroMock self.worker.worker_id = None - self.manager.post = Mock(return_value=JsonResponse({ + self.manager.post = CoroMock(return_value=JsonResponse({ '_id': '5555', })) - # Mock merge_with_home_config() so that it doesn't overwrite actual config. - with unittest.mock.patch('flamenco_worker.config.merge_with_home_config'): - self.worker.startup() + self.asyncio_loop.run_until_complete(self.worker.startup()) + mock_merge_with_home_config.assert_called_once_with( + {'worker_id': '5555', + 'worker_secret': self.worker.worker_secret} + ) assert isinstance(self.manager.post, unittest.mock.Mock) self.manager.post.assert_called_once_with( @@ -72,23 +78,27 @@ class WorkerStartupTest(AbstractWorkerTest): 'secret': self.worker.worker_secret, }, auth=None, + loop=self.asyncio_loop, ) - def test_startup_registration_unhappy(self): + @unittest.mock.patch('flamenco_worker.config.merge_with_home_config') + def test_startup_registration_unhappy(self, mock_merge_with_home_config): """Test that startup is aborted when the worker can't register.""" from flamenco_worker.worker import detect_platform - from mock_responses import JsonResponse + from mock_responses import JsonResponse, CoroMock self.worker.worker_id = None - self.manager.post = unittest.mock.Mock(return_value=JsonResponse({ + self.manager.post = CoroMock(return_value=JsonResponse({ '_id': '5555', }, status_code=500)) # Mock merge_with_home_config() so that it doesn't overwrite actual config. - with unittest.mock.patch('flamenco_worker.config.merge_with_home_config'): - self.assertRaises(requests.HTTPError, self.worker.startup) + self.assertRaises(requests.HTTPError, + self.asyncio_loop.run_until_complete, + self.worker.startup()) + mock_merge_with_home_config.assert_not_called() assert isinstance(self.manager.post, unittest.mock.Mock) self.manager.post.assert_called_once_with( @@ -99,6 +109,7 @@ class WorkerStartupTest(AbstractWorkerTest): 'secret': self.worker.worker_secret, }, auth=None, + loop=self.asyncio_loop, ) @@ -119,11 +130,11 @@ class TestWorkerTaskFetch(AbstractWorkerTest): def test_fetch_task_happy(self): from unittest.mock import call - from mock_responses import JsonResponse, EmptyResponse + from mock_responses import JsonResponse, CoroMock - self.manager.post = Mock() + self.manager.post = CoroMock() # response when fetching a task - self.manager.post.return_value = JsonResponse({ + self.manager.post.coro.return_value = JsonResponse({ '_id': '58514d1e9837734f2e71b479', 'job': '58514d1e9837734f2e71b477', 'manager': '585a795698377345814d2f68', @@ -156,12 +167,13 @@ class TestWorkerTaskFetch(AbstractWorkerTest): # Another fetch-task task should have been scheduled. self.assertNotEqual(self.worker.fetch_task_task, interesting_task) - self.manager.post.assert_called_once_with('/task') + self.manager.post.assert_called_once_with('/task', loop=self.asyncio_loop) self.tuqueue.queue.assert_has_calls([ call('/tasks/58514d1e9837734f2e71b479/update', {'task_progress_percentage': 0, 'activity': '', 'command_progress_percentage': 0, 'task_status': 'active', - 'current_command_idx': 0}), + 'current_command_idx': 0} + ), call('/tasks/58514d1e9837734f2e71b479/update', {'task_progress_percentage': 0, 'activity': '', 'command_progress_percentage': 0, 'task_status': 'completed',