diff --git a/.gitignore b/.gitignore index c4903548bdbb8f62d8bbf93b25b458cef2294c53..c551ed0bb88bc43f74fc0090436749c3a36fb7d6 100644 --- a/.gitignore +++ b/.gitignore @@ -19,3 +19,4 @@ /packages/flamenco-manager-go/src/flamenco-manager/docker/flamenco-manager-*.docker.tgz /packages/flamenco-worker-python/flamenco_worker.egg-info/ +/packages/flamenco-worker-python/flamenco-worker.db diff --git a/packages/flamenco-worker-python/flamenco-worker.cfg b/packages/flamenco-worker-python/flamenco-worker.cfg index 2948535875f15593bb0831618551c3ecbcbf4f38..12259f4965e7ca079a3d1ba7e05ba4d18f0c8245 100644 --- a/packages/flamenco-worker-python/flamenco-worker.cfg +++ b/packages/flamenco-worker-python/flamenco-worker.cfg @@ -1,3 +1,4 @@ [flamenco-worker] manager_url = http://localhost:8083/ job_types = sleep blender_render_simple +task_update_queue_db = flamenco-worker.db diff --git a/packages/flamenco-worker-python/flamenco_worker/cli.py b/packages/flamenco-worker-python/flamenco_worker/cli.py index 3b907333f16876ffeb82e9aec53eff34cc9f9ac1..900965bdaa569b2f42228719fa56b3a810045612 100644 --- a/packages/flamenco-worker-python/flamenco_worker/cli.py +++ b/packages/flamenco-worker-python/flamenco_worker/cli.py @@ -61,18 +61,24 @@ def main(): shutdown_future = loop.create_future() # Piece all the components together. - from . import runner, worker, upstream + from . import runner, worker, upstream, upstream_update_queue fmanager = upstream.FlamencoManager( manager_url=confparser.get(config.CONFIG_SECTION, 'manager_url'), ) + tuqueue = upstream_update_queue.TaskUpdateQueue( + db_fname=confparser.get(config.CONFIG_SECTION, 'task_update_queue_db'), + manager=fmanager, + shutdown_future=shutdown_future, + ) trunner = runner.TaskRunner( shutdown_future=shutdown_future) fworker = worker.FlamencoWorker( manager=fmanager, trunner=trunner, + tuqueue=tuqueue, job_types=confparser.get(config.CONFIG_SECTION, 'job_types').split(), worker_id=confparser.get(config.CONFIG_SECTION, 'worker_id'), worker_secret=confparser.get(config.CONFIG_SECTION, 'worker_secret'), @@ -80,6 +86,9 @@ def main(): shutdown_future=shutdown_future, ) + # Start the task update queue worker loop. + asyncio.ensure_future(tuqueue.work(loop=loop)) + try: fworker.startup() fworker.mainloop() @@ -95,6 +104,7 @@ def main(): log.info('Waiting to give tasks the time to stop gracefully') await asyncio.sleep(2) loop.stop() + loop.run_until_complete(stop_loop()) except: log.exception('Uncaught exception!') diff --git a/packages/flamenco-worker-python/flamenco_worker/config.py b/packages/flamenco-worker-python/flamenco_worker/config.py index 4987ec9fbf55d828f029e39c408042a02884b3a1..9dade823b961599eb29be208c5d4de692b0881d5 100644 --- a/packages/flamenco-worker-python/flamenco_worker/config.py +++ b/packages/flamenco-worker-python/flamenco_worker/config.py @@ -13,6 +13,7 @@ DEFAULT_CONFIG = { 'flamenco-worker': collections.OrderedDict([ ('manager_url', 'http://flamenco-manager/'), ('job_types', 'sleep blender_render_simple'), + ('task_update_queue_db', 'flamenco-worker.db'), ('worker_id', ''), ('worker_secret', ''), ]) diff --git a/packages/flamenco-worker-python/flamenco_worker/upstream.py b/packages/flamenco-worker-python/flamenco_worker/upstream.py index 230ab33b352b862dae77a34084262c89662cd678..51544f800a2f079b68f820a2cd549397d3fe2813 100644 --- a/packages/flamenco-worker-python/flamenco_worker/upstream.py +++ b/packages/flamenco-worker-python/flamenco_worker/upstream.py @@ -9,6 +9,7 @@ HTTP_TIMEOUT = 3 # in seconds @attr.s class FlamencoManager: + # TODO Sybren: make all functions async manager_url = attr.ib(validator=attr.validators.instance_of(str)) session = attr.ib(default=None, init=False) auth = attr.ib(default=None, init=False) # tuple (worker_id, worker_secret) diff --git a/packages/flamenco-worker-python/flamenco_worker/upstream_update_queue.py b/packages/flamenco-worker-python/flamenco_worker/upstream_update_queue.py new file mode 100644 index 0000000000000000000000000000000000000000..b424621f16cec422b1d09be40de3d9e2c2375478 --- /dev/null +++ b/packages/flamenco-worker-python/flamenco_worker/upstream_update_queue.py @@ -0,0 +1,132 @@ +"""Queues task updates to Flamenco Manager. + +Task updates are pickled and stored in a SQLite database. Pickling allows +for efficient conversion of Python objects into a binary data blob. +""" + +import asyncio +import pickle +import sqlite3 + +import attr + +from . import attrs_extra, upstream + +BACKOFF_TIME = 5 # seconds +SHUTDOWN_RECHECK_TIME = 0.5 # seconds + + +@attr.s +class TaskUpdateQueue: + manager = attr.ib(validator=attr.validators.instance_of(upstream.FlamencoManager)) + shutdown_future = attr.ib( + validator=attr.validators.optional(attr.validators.instance_of(asyncio.Future))) + db_fname = attr.ib(validator=attr.validators.instance_of(str)) + + backoff_time = attr.ib(default=BACKOFF_TIME) + shutdown_recheck_time = attr.ib(default=SHUTDOWN_RECHECK_TIME) + + _stuff_queued = attr.ib(default=attr.Factory(asyncio.Event), init=False) + _db = attr.ib(default=None, init=False) + _log = attrs_extra.log('%s.TaskUpdateQueue' % __name__) + + def _connect_db(self): + self._log.info('Connecting to database %s', self.db_fname) + self._db = sqlite3.connect(self.db_fname) + + # We don't need to create a primary key; we use the implicit 'rowid' column. + self._db.execute('CREATE TABLE IF NOT EXISTS fworker_queue(url TEXT, payload BLOB)') + + def _disconnect_db(self): + self._log.info('Disconnecting from database %s', self.db_fname) + self._db.close() + self._db = None + + async def queue(self, url, payload): + """Push some payload onto the queue.""" + + if self._db is None: + self._connect_db() + + # Store the pickled payload in the SQLite database. + pickled = pickle.dumps(payload) + self._db.execute('INSERT INTO fworker_queue (url, payload) values (?, ?)', (url, pickled)) + self._db.commit() + + # Notify the work loop that stuff has been queued. + self._stuff_queued.set() + + async def work(self, *, loop=None): + """Loop that pushes queued payloads to the Flamenco Manager. + + Keeps running until shutdown_future.done() returns True. + """ + + import requests + + # Always start by inspecting the persisted queue, so act as if something + # was just queued. + self._stuff_queued.set() + + while not self.shutdown_future.done(): + try: + await asyncio.wait_for(self._stuff_queued.wait(), + self.shutdown_recheck_time, + loop=loop) + except asyncio.TimeoutError: + # This is normal, it just means that there wasn't anything queued within + # SHUTDOWN_RECHECK_TIME seconds. + continue + + self._log.debug('Inspecting queued task updates.') + try: + queue_is_empty = True + for rowid, url, payload in self._queue(): + queue_is_empty = False + + self._log.info('Pushing task update to Manager') + resp = self.manager.post(url, json=payload) + resp.raise_for_status() + self._log.debug('Master accepted pushed update.') + self._unqueue(rowid) + + if queue_is_empty: + # Only clear the flag once the queue has really been cleared. + self._stuff_queued.clear() + except requests.ConnectionError: + self._log.warning('Unable to connect to Manager, will retry later.') + await asyncio.sleep(self.backoff_time) + except requests.HTTPError as ex: + self._log.warning('Manager did not accept our updates (%s), will retry later.', + ex) + await asyncio.sleep(self.backoff_time) + except Exception: + self._log.exception('Unexpected exception in work loop. ' + 'Backing off and retring later.') + await asyncio.sleep(self.backoff_time) + + self._log.warning('Stopping work loop') + + def _queue(self) -> (int, str, object): + """Yields (rowid, url, unpickled payload) tuples from the database.""" + + if self._db is None: + self._connect_db() + + result = self._db.execute(''' + SELECT rowid, url, payload + FROM fworker_queue + ORDER BY rowid ASC + ''') + for row in result: + rowid = row[0] + url = row[1] + payload = pickle.loads(row[2]) + yield rowid, url, payload + + def _unqueue(self, rowid: int): + """Removes a queued payload from the database.""" + + # TODO Sybren: every once in a while, run 'vacuum' on the database. + self._db.execute('DELETE FROM fworker_queue WHERE rowid=?', (rowid, )) + self._db.commit() diff --git a/packages/flamenco-worker-python/flamenco_worker/worker.py b/packages/flamenco-worker-python/flamenco_worker/worker.py index c8b7a3480ae647575a021723ef96912c6e66a2fa..696b423d2c1d9dad1b4056042d0b2eacf505e004 100644 --- a/packages/flamenco-worker-python/flamenco_worker/worker.py +++ b/packages/flamenco-worker-python/flamenco_worker/worker.py @@ -6,6 +6,7 @@ import attr from . import attrs_extra from . import documents from . import upstream +from . import upstream_update_queue # All durations/delays/etc are in seconds. FETCH_TASK_FAILED_RETRY_DELAY = 10 # when we failed obtaining a task @@ -28,6 +29,7 @@ class UnableToRegisterError(Exception): class FlamencoWorker: manager = attr.ib(validator=attr.validators.instance_of(upstream.FlamencoManager)) trunner = attr.ib() # Instance of flamenco_worker.runner.TaskRunner + tuqueue = attr.ib(validator=attr.validators.instance_of(upstream_update_queue.TaskUpdateQueue)) job_types = attr.ib(validator=attr.validators.instance_of(list)) worker_id = attr.ib(validator=attr.validators.instance_of(str)) worker_secret = attr.ib(validator=attr.validators.instance_of(str)) @@ -221,11 +223,9 @@ class FlamencoWorker: async def push_to_manager(self): """Updates a task's status and activity. - """ - - # TODO Sybren: do this in a separate thread, as to not block the task runner. - import requests + Uses the TaskUpdateQueue to handle persistent queueing. + """ self._log.info('Updating task %s with status %r and activity %r', self.task_id, self.current_task_status, self.last_task_activity) @@ -242,12 +242,7 @@ class FlamencoWorker: self._queued_log_entries.clear() self.last_log_push = now - resp = self.manager.post('/tasks/%s/update' % self.task_id, json=payload) - self._log.debug('Sent task %s update to manager', self.task_id) - try: - resp.raise_for_status() - except requests.HTTPError as ex: - self._log.error('Unable to send status update to manager, update is lost: %s', ex) + await self.tuqueue.queue('/tasks/%s/update' % self.task_id, payload) async def register_task_update(self, *, task_status: str = None, diff --git a/packages/flamenco-worker-python/tests/abstract_worker_test.py b/packages/flamenco-worker-python/tests/abstract_worker_test.py new file mode 100644 index 0000000000000000000000000000000000000000..a2def8c6764fcbf4958b51ef340ea5d1d4c171f6 --- /dev/null +++ b/packages/flamenco-worker-python/tests/abstract_worker_test.py @@ -0,0 +1,12 @@ +import unittest + + +class AbstractWorkerTest(unittest.TestCase): + @classmethod + def setUpClass(cls): + import logging + + logging.basicConfig( + level=logging.DEBUG, + format='%(asctime)-15s %(levelname)8s %(name)s %(message)s', + ) diff --git a/packages/flamenco-worker-python/tests/test_upstream_update_queue.py b/packages/flamenco-worker-python/tests/test_upstream_update_queue.py new file mode 100644 index 0000000000000000000000000000000000000000..22437153db996744fce8c513b5104e038fb9a728 --- /dev/null +++ b/packages/flamenco-worker-python/tests/test_upstream_update_queue.py @@ -0,0 +1,137 @@ +import asyncio +import copy +import datetime +import tempfile +from unittest.mock import Mock + +import requests + +from abstract_worker_test import AbstractWorkerTest + + +class TaskUpdateQueueTest(AbstractWorkerTest): + def setUp(self): + from flamenco_worker.upstream import FlamencoManager + from flamenco_worker.upstream_update_queue import TaskUpdateQueue + + self.asyncio_loop = asyncio.get_event_loop() + self.shutdown_future = self.asyncio_loop.create_future() + + self.manager = Mock(spec=FlamencoManager) + self.manager.post = Mock() + + self.tmpdir = tempfile.TemporaryDirectory() + self.tuqueue = TaskUpdateQueue( + db_fname='%s/unittest.db' % self.tmpdir.name, + manager=self.manager, + shutdown_future=self.shutdown_future, + backoff_time=0.3, # faster retry to keep the unittest speedy. + ) + + def tearDown(self): + self.tmpdir.cleanup() + + def test_queue_push(self): + """Test that a queue() is followed by an actual push to Flamenco Manager. + + Also tests connection errors and other HTTP error statuses. + """ + + from mock_responses import JsonResponse, EmptyResponse + + # Try different value types + payload = {'key': 'value', + 'sub': {'some': 13, + 'values': datetime.datetime.now()}} + + tries = 0 + received_payload = None + received_url = None + + def push_callback(url, *, json): + nonlocal tries + nonlocal received_url + nonlocal received_payload + + tries += 1 + if tries < 3: + raise requests.ConnectionError() + if tries == 3: + return JsonResponse({}, status_code=500) + + # Shut down after handling this push. + self.shutdown_future.cancel() + + # Remember what we received. Calling self.assertEqual() here doesn't stop the unittest, + # since the work loop is designed to keep running, even when exceptions are thrown. + received_url = url + received_payload = copy.deepcopy(json) + return EmptyResponse() + + self.manager.post.side_effect = push_callback + + asyncio.ensure_future(self.tuqueue.queue('/push/here', payload), loop=self.asyncio_loop) + + # Run the loop for 2 seconds. This should be enough for 3 retries of 0.3 seconds + handling + # the actual payload. + self.asyncio_loop.run_until_complete( + asyncio.wait_for( + self.tuqueue.work(loop=self.asyncio_loop), + timeout=2 + ) + ) + + # Check the payload. + self.assertEqual(received_url, '/push/here') + self.assertEqual(received_payload, payload) + + def test_queue_persistence(self): + """Check that updates are pushed, even when the process is stopped & restarted.""" + + from mock_responses import EmptyResponse + from flamenco_worker.upstream_update_queue import TaskUpdateQueue + + # Try different value types + payload = {'key': 'value', + 'sub': {'some': 13, + 'values': datetime.datetime.now()}} + self.asyncio_loop.run_until_complete(self.tuqueue.queue('/push/there', payload)) + self.manager.post.assert_not_called() + self.tuqueue._disconnect_db() + + # Create a new tuqueue to handle the push, using the same database. + # Note that we don't have to stop self.tuqueue because we never ran self.tuqueue.work(). + new_tuqueue = TaskUpdateQueue( + db_fname=self.tuqueue.db_fname, + manager=self.manager, + shutdown_future=self.shutdown_future, + backoff_time=5, # no retry in this test, so any retry should cause a timeout. + ) + + received_payload = None + received_url = None + + def push_callback(url, *, json): + nonlocal received_url + nonlocal received_payload + + # Shut down after handling this push. + self.shutdown_future.cancel() + + received_url = url + received_payload = copy.deepcopy(json) + return EmptyResponse() + + self.manager.post.side_effect = push_callback + + # This should pick up on the pushed data. + self.asyncio_loop.run_until_complete( + asyncio.wait_for( + new_tuqueue.work(loop=self.asyncio_loop), + timeout=2 + ) + ) + + # Check the payload + self.assertEqual(received_url, '/push/there') + self.assertEqual(received_payload, payload) diff --git a/packages/flamenco-worker-python/tests/test_worker.py b/packages/flamenco-worker-python/tests/test_worker.py index d6445b1ea98cd8846072cc206dc56204b9d56514..18bb3a64cc7047e76f08b77d8ec21b83a9c76a3e 100644 --- a/packages/flamenco-worker-python/tests/test_worker.py +++ b/packages/flamenco-worker-python/tests/test_worker.py @@ -11,18 +11,21 @@ class AbstractWorkerTest(unittest.TestCase): from flamenco_worker.upstream import FlamencoManager from flamenco_worker.worker import FlamencoWorker from flamenco_worker.runner import TaskRunner + from flamenco_worker.upstream_update_queue import TaskUpdateQueue self.asyncio_loop = asyncio.get_event_loop() self.shutdown_future = self.asyncio_loop.create_future() self.manager = Mock(spec=FlamencoManager) self.trunner = Mock(spec=TaskRunner) + self.tuqueue = Mock(spec=TaskUpdateQueue) self.trunner.execute = self.mock_task_execute self.worker = FlamencoWorker( manager=self.manager, trunner=self.trunner, + tuqueue=self.tuqueue, job_types=['sleep', 'unittest'], worker_id='1234', worker_secret='jemoeder', @@ -44,6 +47,7 @@ class WorkerStartupTest(AbstractWorkerTest): def test_startup_already_registered(self): self.worker.startup() self.manager.post.assert_not_called() + self.tuqueue.queue.assert_not_called() def test_startup_registration(self): from flamenco_worker.worker import detect_platform @@ -118,26 +122,27 @@ class TestWorkerTaskFetch(AbstractWorkerTest): from mock_responses import JsonResponse, EmptyResponse self.manager.post = Mock() - self.manager.post.side_effect = [ - # response when fetching a task - JsonResponse({ - '_id': '58514d1e9837734f2e71b479', - 'job': '58514d1e9837734f2e71b477', - 'manager': '585a795698377345814d2f68', - 'project': '', - 'user': '580f8c66983773759afdb20e', - 'name': 'sleep-14-26', - 'status': 'processing', - 'priority': 50, - 'job_type': 'sleep', - 'commands': [ - {'name': 'echo', 'settings': {'message': 'Preparing to sleep'}}, - {'name': 'sleep', 'settings': {'time_in_seconds': 3}} - ] - }), + # response when fetching a task + self.manager.post.return_value = JsonResponse({ + '_id': '58514d1e9837734f2e71b479', + 'job': '58514d1e9837734f2e71b477', + 'manager': '585a795698377345814d2f68', + 'project': '', + 'user': '580f8c66983773759afdb20e', + 'name': 'sleep-14-26', + 'status': 'processing', + 'priority': 50, + 'job_type': 'sleep', + 'commands': [ + {'name': 'echo', 'settings': {'message': 'Preparing to sleep'}}, + {'name': 'sleep', 'settings': {'time_in_seconds': 3}} + ] + }) + + self.tuqueue.queue.side_effect = [ # Responses after status updates - EmptyResponse(), # task becoming active - EmptyResponse(), # task becoming complete + None, # task becoming active + None, # task becoming complete ] self.worker.schedule_fetch_task() @@ -149,17 +154,15 @@ class TestWorkerTaskFetch(AbstractWorkerTest): # Another fetch-task task should have been scheduled. self.assertNotEqual(self.worker.fetch_task_task, interesting_task) - self.manager.post.assert_has_calls([ - call('/task'), + self.manager.post.assert_called_once_with('/task') + self.tuqueue.queue.assert_has_calls([ call('/tasks/58514d1e9837734f2e71b479/update', - json={'task_progress_percentage': 0, 'activity': '', - 'command_progress_percentage': 0, 'task_status': 'active', - 'current_command_idx': 0}, - ), + {'task_progress_percentage': 0, 'activity': '', + 'command_progress_percentage': 0, 'task_status': 'active', + 'current_command_idx': 0}), call('/tasks/58514d1e9837734f2e71b479/update', - json={'task_progress_percentage': 0, 'activity': '', - 'command_progress_percentage': 0, 'task_status': 'completed', - 'current_command_idx': 0}, - ), + {'task_progress_percentage': 0, 'activity': '', + 'command_progress_percentage': 0, 'task_status': 'completed', + 'current_command_idx': 0}), ]) - self.assertEqual(self.manager.post.call_count, 3) + self.assertEqual(self.tuqueue.queue.call_count, 2)