Skip to content
Snippets Groups Projects
Commit eb8c12b1 authored by Sybren A. Stüvel's avatar Sybren A. Stüvel
Browse files

Worker: queue task updates in a persistent sqlite database

This allows the worker to queue up task updates when Flamenco Manager
is unreachable.

Note that asyncio is used for task scheduling & execution, but HTTP connections
and subprocess communication is not (yet). As a result, a blocking POST
call will block proc.stdout.read() in AbstractSubprocessCommand, and
vice versa. This can be solved by using either multithreading or asyncio
to perform those operations.
parent 74695f13
No related branches found
No related tags found
No related merge requests found
Showing with 334 additions and 41 deletions
......@@ -19,3 +19,4 @@
/packages/flamenco-manager-go/src/flamenco-manager/docker/flamenco-manager-*.docker.tgz
/packages/flamenco-worker-python/flamenco_worker.egg-info/
/packages/flamenco-worker-python/flamenco-worker.db
[flamenco-worker]
manager_url = http://localhost:8083/
job_types = sleep blender_render_simple
task_update_queue_db = flamenco-worker.db
......@@ -61,18 +61,24 @@ def main():
shutdown_future = loop.create_future()
# Piece all the components together.
from . import runner, worker, upstream
from . import runner, worker, upstream, upstream_update_queue
fmanager = upstream.FlamencoManager(
manager_url=confparser.get(config.CONFIG_SECTION, 'manager_url'),
)
tuqueue = upstream_update_queue.TaskUpdateQueue(
db_fname=confparser.get(config.CONFIG_SECTION, 'task_update_queue_db'),
manager=fmanager,
shutdown_future=shutdown_future,
)
trunner = runner.TaskRunner(
shutdown_future=shutdown_future)
fworker = worker.FlamencoWorker(
manager=fmanager,
trunner=trunner,
tuqueue=tuqueue,
job_types=confparser.get(config.CONFIG_SECTION, 'job_types').split(),
worker_id=confparser.get(config.CONFIG_SECTION, 'worker_id'),
worker_secret=confparser.get(config.CONFIG_SECTION, 'worker_secret'),
......@@ -80,6 +86,9 @@ def main():
shutdown_future=shutdown_future,
)
# Start the task update queue worker loop.
asyncio.ensure_future(tuqueue.work(loop=loop))
try:
fworker.startup()
fworker.mainloop()
......@@ -95,6 +104,7 @@ def main():
log.info('Waiting to give tasks the time to stop gracefully')
await asyncio.sleep(2)
loop.stop()
loop.run_until_complete(stop_loop())
except:
log.exception('Uncaught exception!')
......
......@@ -13,6 +13,7 @@ DEFAULT_CONFIG = {
'flamenco-worker': collections.OrderedDict([
('manager_url', 'http://flamenco-manager/'),
('job_types', 'sleep blender_render_simple'),
('task_update_queue_db', 'flamenco-worker.db'),
('worker_id', ''),
('worker_secret', ''),
])
......
......@@ -9,6 +9,7 @@ HTTP_TIMEOUT = 3 # in seconds
@attr.s
class FlamencoManager:
# TODO Sybren: make all functions async
manager_url = attr.ib(validator=attr.validators.instance_of(str))
session = attr.ib(default=None, init=False)
auth = attr.ib(default=None, init=False) # tuple (worker_id, worker_secret)
......
"""Queues task updates to Flamenco Manager.
Task updates are pickled and stored in a SQLite database. Pickling allows
for efficient conversion of Python objects into a binary data blob.
"""
import asyncio
import pickle
import sqlite3
import attr
from . import attrs_extra, upstream
BACKOFF_TIME = 5 # seconds
SHUTDOWN_RECHECK_TIME = 0.5 # seconds
@attr.s
class TaskUpdateQueue:
manager = attr.ib(validator=attr.validators.instance_of(upstream.FlamencoManager))
shutdown_future = attr.ib(
validator=attr.validators.optional(attr.validators.instance_of(asyncio.Future)))
db_fname = attr.ib(validator=attr.validators.instance_of(str))
backoff_time = attr.ib(default=BACKOFF_TIME)
shutdown_recheck_time = attr.ib(default=SHUTDOWN_RECHECK_TIME)
_stuff_queued = attr.ib(default=attr.Factory(asyncio.Event), init=False)
_db = attr.ib(default=None, init=False)
_log = attrs_extra.log('%s.TaskUpdateQueue' % __name__)
def _connect_db(self):
self._log.info('Connecting to database %s', self.db_fname)
self._db = sqlite3.connect(self.db_fname)
# We don't need to create a primary key; we use the implicit 'rowid' column.
self._db.execute('CREATE TABLE IF NOT EXISTS fworker_queue(url TEXT, payload BLOB)')
def _disconnect_db(self):
self._log.info('Disconnecting from database %s', self.db_fname)
self._db.close()
self._db = None
async def queue(self, url, payload):
"""Push some payload onto the queue."""
if self._db is None:
self._connect_db()
# Store the pickled payload in the SQLite database.
pickled = pickle.dumps(payload)
self._db.execute('INSERT INTO fworker_queue (url, payload) values (?, ?)', (url, pickled))
self._db.commit()
# Notify the work loop that stuff has been queued.
self._stuff_queued.set()
async def work(self, *, loop=None):
"""Loop that pushes queued payloads to the Flamenco Manager.
Keeps running until shutdown_future.done() returns True.
"""
import requests
# Always start by inspecting the persisted queue, so act as if something
# was just queued.
self._stuff_queued.set()
while not self.shutdown_future.done():
try:
await asyncio.wait_for(self._stuff_queued.wait(),
self.shutdown_recheck_time,
loop=loop)
except asyncio.TimeoutError:
# This is normal, it just means that there wasn't anything queued within
# SHUTDOWN_RECHECK_TIME seconds.
continue
self._log.debug('Inspecting queued task updates.')
try:
queue_is_empty = True
for rowid, url, payload in self._queue():
queue_is_empty = False
self._log.info('Pushing task update to Manager')
resp = self.manager.post(url, json=payload)
resp.raise_for_status()
self._log.debug('Master accepted pushed update.')
self._unqueue(rowid)
if queue_is_empty:
# Only clear the flag once the queue has really been cleared.
self._stuff_queued.clear()
except requests.ConnectionError:
self._log.warning('Unable to connect to Manager, will retry later.')
await asyncio.sleep(self.backoff_time)
except requests.HTTPError as ex:
self._log.warning('Manager did not accept our updates (%s), will retry later.',
ex)
await asyncio.sleep(self.backoff_time)
except Exception:
self._log.exception('Unexpected exception in work loop. '
'Backing off and retring later.')
await asyncio.sleep(self.backoff_time)
self._log.warning('Stopping work loop')
def _queue(self) -> (int, str, object):
"""Yields (rowid, url, unpickled payload) tuples from the database."""
if self._db is None:
self._connect_db()
result = self._db.execute('''
SELECT rowid, url, payload
FROM fworker_queue
ORDER BY rowid ASC
''')
for row in result:
rowid = row[0]
url = row[1]
payload = pickle.loads(row[2])
yield rowid, url, payload
def _unqueue(self, rowid: int):
"""Removes a queued payload from the database."""
# TODO Sybren: every once in a while, run 'vacuum' on the database.
self._db.execute('DELETE FROM fworker_queue WHERE rowid=?', (rowid, ))
self._db.commit()
......@@ -6,6 +6,7 @@ import attr
from . import attrs_extra
from . import documents
from . import upstream
from . import upstream_update_queue
# All durations/delays/etc are in seconds.
FETCH_TASK_FAILED_RETRY_DELAY = 10 # when we failed obtaining a task
......@@ -28,6 +29,7 @@ class UnableToRegisterError(Exception):
class FlamencoWorker:
manager = attr.ib(validator=attr.validators.instance_of(upstream.FlamencoManager))
trunner = attr.ib() # Instance of flamenco_worker.runner.TaskRunner
tuqueue = attr.ib(validator=attr.validators.instance_of(upstream_update_queue.TaskUpdateQueue))
job_types = attr.ib(validator=attr.validators.instance_of(list))
worker_id = attr.ib(validator=attr.validators.instance_of(str))
worker_secret = attr.ib(validator=attr.validators.instance_of(str))
......@@ -221,11 +223,9 @@ class FlamencoWorker:
async def push_to_manager(self):
"""Updates a task's status and activity.
"""
# TODO Sybren: do this in a separate thread, as to not block the task runner.
import requests
Uses the TaskUpdateQueue to handle persistent queueing.
"""
self._log.info('Updating task %s with status %r and activity %r',
self.task_id, self.current_task_status, self.last_task_activity)
......@@ -242,12 +242,7 @@ class FlamencoWorker:
self._queued_log_entries.clear()
self.last_log_push = now
resp = self.manager.post('/tasks/%s/update' % self.task_id, json=payload)
self._log.debug('Sent task %s update to manager', self.task_id)
try:
resp.raise_for_status()
except requests.HTTPError as ex:
self._log.error('Unable to send status update to manager, update is lost: %s', ex)
await self.tuqueue.queue('/tasks/%s/update' % self.task_id, payload)
async def register_task_update(self, *,
task_status: str = None,
......
import unittest
class AbstractWorkerTest(unittest.TestCase):
@classmethod
def setUpClass(cls):
import logging
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)-15s %(levelname)8s %(name)s %(message)s',
)
import asyncio
import copy
import datetime
import tempfile
from unittest.mock import Mock
import requests
from abstract_worker_test import AbstractWorkerTest
class TaskUpdateQueueTest(AbstractWorkerTest):
def setUp(self):
from flamenco_worker.upstream import FlamencoManager
from flamenco_worker.upstream_update_queue import TaskUpdateQueue
self.asyncio_loop = asyncio.get_event_loop()
self.shutdown_future = self.asyncio_loop.create_future()
self.manager = Mock(spec=FlamencoManager)
self.manager.post = Mock()
self.tmpdir = tempfile.TemporaryDirectory()
self.tuqueue = TaskUpdateQueue(
db_fname='%s/unittest.db' % self.tmpdir.name,
manager=self.manager,
shutdown_future=self.shutdown_future,
backoff_time=0.3, # faster retry to keep the unittest speedy.
)
def tearDown(self):
self.tmpdir.cleanup()
def test_queue_push(self):
"""Test that a queue() is followed by an actual push to Flamenco Manager.
Also tests connection errors and other HTTP error statuses.
"""
from mock_responses import JsonResponse, EmptyResponse
# Try different value types
payload = {'key': 'value',
'sub': {'some': 13,
'values': datetime.datetime.now()}}
tries = 0
received_payload = None
received_url = None
def push_callback(url, *, json):
nonlocal tries
nonlocal received_url
nonlocal received_payload
tries += 1
if tries < 3:
raise requests.ConnectionError()
if tries == 3:
return JsonResponse({}, status_code=500)
# Shut down after handling this push.
self.shutdown_future.cancel()
# Remember what we received. Calling self.assertEqual() here doesn't stop the unittest,
# since the work loop is designed to keep running, even when exceptions are thrown.
received_url = url
received_payload = copy.deepcopy(json)
return EmptyResponse()
self.manager.post.side_effect = push_callback
asyncio.ensure_future(self.tuqueue.queue('/push/here', payload), loop=self.asyncio_loop)
# Run the loop for 2 seconds. This should be enough for 3 retries of 0.3 seconds + handling
# the actual payload.
self.asyncio_loop.run_until_complete(
asyncio.wait_for(
self.tuqueue.work(loop=self.asyncio_loop),
timeout=2
)
)
# Check the payload.
self.assertEqual(received_url, '/push/here')
self.assertEqual(received_payload, payload)
def test_queue_persistence(self):
"""Check that updates are pushed, even when the process is stopped & restarted."""
from mock_responses import EmptyResponse
from flamenco_worker.upstream_update_queue import TaskUpdateQueue
# Try different value types
payload = {'key': 'value',
'sub': {'some': 13,
'values': datetime.datetime.now()}}
self.asyncio_loop.run_until_complete(self.tuqueue.queue('/push/there', payload))
self.manager.post.assert_not_called()
self.tuqueue._disconnect_db()
# Create a new tuqueue to handle the push, using the same database.
# Note that we don't have to stop self.tuqueue because we never ran self.tuqueue.work().
new_tuqueue = TaskUpdateQueue(
db_fname=self.tuqueue.db_fname,
manager=self.manager,
shutdown_future=self.shutdown_future,
backoff_time=5, # no retry in this test, so any retry should cause a timeout.
)
received_payload = None
received_url = None
def push_callback(url, *, json):
nonlocal received_url
nonlocal received_payload
# Shut down after handling this push.
self.shutdown_future.cancel()
received_url = url
received_payload = copy.deepcopy(json)
return EmptyResponse()
self.manager.post.side_effect = push_callback
# This should pick up on the pushed data.
self.asyncio_loop.run_until_complete(
asyncio.wait_for(
new_tuqueue.work(loop=self.asyncio_loop),
timeout=2
)
)
# Check the payload
self.assertEqual(received_url, '/push/there')
self.assertEqual(received_payload, payload)
......@@ -11,18 +11,21 @@ class AbstractWorkerTest(unittest.TestCase):
from flamenco_worker.upstream import FlamencoManager
from flamenco_worker.worker import FlamencoWorker
from flamenco_worker.runner import TaskRunner
from flamenco_worker.upstream_update_queue import TaskUpdateQueue
self.asyncio_loop = asyncio.get_event_loop()
self.shutdown_future = self.asyncio_loop.create_future()
self.manager = Mock(spec=FlamencoManager)
self.trunner = Mock(spec=TaskRunner)
self.tuqueue = Mock(spec=TaskUpdateQueue)
self.trunner.execute = self.mock_task_execute
self.worker = FlamencoWorker(
manager=self.manager,
trunner=self.trunner,
tuqueue=self.tuqueue,
job_types=['sleep', 'unittest'],
worker_id='1234',
worker_secret='jemoeder',
......@@ -44,6 +47,7 @@ class WorkerStartupTest(AbstractWorkerTest):
def test_startup_already_registered(self):
self.worker.startup()
self.manager.post.assert_not_called()
self.tuqueue.queue.assert_not_called()
def test_startup_registration(self):
from flamenco_worker.worker import detect_platform
......@@ -118,26 +122,27 @@ class TestWorkerTaskFetch(AbstractWorkerTest):
from mock_responses import JsonResponse, EmptyResponse
self.manager.post = Mock()
self.manager.post.side_effect = [
# response when fetching a task
JsonResponse({
'_id': '58514d1e9837734f2e71b479',
'job': '58514d1e9837734f2e71b477',
'manager': '585a795698377345814d2f68',
'project': '',
'user': '580f8c66983773759afdb20e',
'name': 'sleep-14-26',
'status': 'processing',
'priority': 50,
'job_type': 'sleep',
'commands': [
{'name': 'echo', 'settings': {'message': 'Preparing to sleep'}},
{'name': 'sleep', 'settings': {'time_in_seconds': 3}}
]
}),
# response when fetching a task
self.manager.post.return_value = JsonResponse({
'_id': '58514d1e9837734f2e71b479',
'job': '58514d1e9837734f2e71b477',
'manager': '585a795698377345814d2f68',
'project': '',
'user': '580f8c66983773759afdb20e',
'name': 'sleep-14-26',
'status': 'processing',
'priority': 50,
'job_type': 'sleep',
'commands': [
{'name': 'echo', 'settings': {'message': 'Preparing to sleep'}},
{'name': 'sleep', 'settings': {'time_in_seconds': 3}}
]
})
self.tuqueue.queue.side_effect = [
# Responses after status updates
EmptyResponse(), # task becoming active
EmptyResponse(), # task becoming complete
None, # task becoming active
None, # task becoming complete
]
self.worker.schedule_fetch_task()
......@@ -149,17 +154,15 @@ class TestWorkerTaskFetch(AbstractWorkerTest):
# Another fetch-task task should have been scheduled.
self.assertNotEqual(self.worker.fetch_task_task, interesting_task)
self.manager.post.assert_has_calls([
call('/task'),
self.manager.post.assert_called_once_with('/task')
self.tuqueue.queue.assert_has_calls([
call('/tasks/58514d1e9837734f2e71b479/update',
json={'task_progress_percentage': 0, 'activity': '',
'command_progress_percentage': 0, 'task_status': 'active',
'current_command_idx': 0},
),
{'task_progress_percentage': 0, 'activity': '',
'command_progress_percentage': 0, 'task_status': 'active',
'current_command_idx': 0}),
call('/tasks/58514d1e9837734f2e71b479/update',
json={'task_progress_percentage': 0, 'activity': '',
'command_progress_percentage': 0, 'task_status': 'completed',
'current_command_idx': 0},
),
{'task_progress_percentage': 0, 'activity': '',
'command_progress_percentage': 0, 'task_status': 'completed',
'current_command_idx': 0}),
])
self.assertEqual(self.manager.post.call_count, 3)
self.assertEqual(self.tuqueue.queue.call_count, 2)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment