Skip to content
Snippets Groups Projects
Commit ca618985 authored by Sybren A. Stüvel's avatar Sybren A. Stüvel
Browse files

Worker: use asyncio task scheduling to ensure activities & logs are sent.

Before this comment, logs & activities would be queued, and this queue was
pushed to master when a log or activity was registered and certain criteria
were met. As a result, when no more logs/activities were registered, the
queue would not be flushed. This is now solved by scheduling a push to
master, and cancelling that task if that push happens for any other reason.
parent f86884ce
No related branches found
No related tags found
No related merge requests found
......@@ -63,6 +63,25 @@ class FlamencoWorker:
# a complete Activity each time.
last_task_activity = attr.ib(default=attr.Factory(documents.Activity))
# Configuration
push_log_max_interval = attr.ib(default=PUSH_LOG_MAX_INTERVAL,
validator=attr.validators.instance_of(datetime.timedelta))
push_log_max_entries = attr.ib(default=PUSH_LOG_MAX_ENTRIES,
validator=attr.validators.instance_of(int))
push_act_max_interval = attr.ib(default=PUSH_ACT_MAX_INTERVAL,
validator=attr.validators.instance_of(datetime.timedelta))
# Futures that represent delayed calls to push_to_master().
# They are scheduled when logs & activities are registered but not yet pushed. They are
# cancelled when a push_to_master() actually happens for another reason. There are different
# futures for activity and log pushing, as these can have different max intervals.
_push_log_to_manager = attr.ib(
default=None, init=False,
validator=attr.validators.optional(attr.validators.instance_of(asyncio.Future)))
_push_act_to_manager = attr.ib(
default=None, init=False,
validator=attr.validators.optional(attr.validators.instance_of(asyncio.Future)))
_log = attrs_extra.log('%s.FlamencoWorker' % __name__)
async def startup(self):
......@@ -228,12 +247,17 @@ class FlamencoWorker:
# when we're in some infinite failure loop.
self.schedule_fetch_task(FETCH_TASK_DONE_SCHEDULE_NEW_DELAY)
async def push_to_manager(self):
async def push_to_manager(self, *, delay: datetime.timedelta=None):
"""Updates a task's status and activity.
Uses the TaskUpdateQueue to handle persistent queueing.
"""
if delay is not None:
delay_sec = delay.total_seconds()
self._log.debug('Scheduled delayed push to master in %r seconds', delay_sec)
await asyncio.sleep(delay_sec)
self._log.info('Updating task %s with status %r and activity %r',
self.task_id, self.current_task_status, self.last_task_activity)
......@@ -243,12 +267,20 @@ class FlamencoWorker:
now = datetime.datetime.now()
self.last_activity_push = now
# Cancel any pending push task, as we're pushing activities now.
if self._push_act_to_manager is not None:
self._push_act_to_manager.cancel()
with (await self._queue_lock):
if self._queued_log_entries:
payload['log'] = '\n'.join(self._queued_log_entries)
self._queued_log_entries.clear()
self.last_log_push = now
# Cancel any pending push task, as we're pushing logs now.
if self._push_log_to_manager is not None:
self._push_log_to_manager.cancel()
self.tuqueue.queue('/tasks/%s/update' % self.task_id, payload, loop=self.loop)
async def register_task_update(self, *,
......@@ -276,10 +308,14 @@ class FlamencoWorker:
if task_status_changed:
self._log.info('Task changed status to %s, pushing to manager', task_status)
await self.push_to_manager()
elif datetime.datetime.now() - self.last_activity_push > PUSH_ACT_MAX_INTERVAL:
elif datetime.datetime.now() - self.last_activity_push > self.push_act_max_interval:
self._log.info('More than %s since last activity update, pushing to manager',
PUSH_ACT_MAX_INTERVAL)
self.push_act_max_interval)
await self.push_to_manager()
elif self._push_act_to_manager is None or self._push_act_to_manager.done():
# Schedule a future push to manager.
self._push_act_to_manager = asyncio.ensure_future(
self.push_to_manager(delay=self.push_act_max_interval))
async def register_log(self, log_entry, *fmt_args):
"""Registers a log entry, and possibly sends all queued log entries to upstream Manager.
......@@ -298,14 +334,18 @@ class FlamencoWorker:
self._queued_log_entries.append('%s: %s' % (now, log_entry))
queue_size = len(self._queued_log_entries)
if queue_size > PUSH_LOG_MAX_ENTRIES:
if queue_size > self.push_log_max_entries:
self._log.info('Queued up more than %i log entries, pushing to manager',
PUSH_LOG_MAX_ENTRIES)
self.push_log_max_entries)
await self.push_to_manager()
elif datetime.datetime.now() - self.last_log_push > PUSH_LOG_MAX_INTERVAL:
elif datetime.datetime.now() - self.last_log_push > self.push_log_max_interval:
self._log.info('More than %s since last log update, pushing to manager',
PUSH_LOG_MAX_INTERVAL)
self.push_log_max_interval)
await self.push_to_manager()
elif self._push_log_to_manager is None or self._push_log_to_manager.done():
# Schedule a future push to manager.
self._push_log_to_manager = asyncio.ensure_future(
self.push_to_manager(delay=self.push_log_max_interval))
def generate_secret() -> str:
......
......@@ -180,3 +180,113 @@ class TestWorkerTaskFetch(AbstractFWorkerTest):
)
])
self.assertEqual(self.tuqueue.queue.call_count, 2)
class WorkerPushToMasterTest(AbstractFWorkerTest):
def test_one_activity(self):
"""A single activity should be sent to master within reasonable time."""
from datetime import timedelta
queue_pushed_future = asyncio.Future()
def queue_pushed(*args, **kwargs):
queue_pushed_future.set_result(True)
self.tuqueue.queue.side_effect = queue_pushed
self.worker.push_act_max_interval = timedelta(milliseconds=500)
asyncio.ensure_future(
self.worker.register_task_update(activity='test'),
loop=self.asyncio_loop)
self.asyncio_loop.run_until_complete(
asyncio.wait_for(queue_pushed_future, 1))
# Queue push should only be done once
self.assertEqual(self.tuqueue.queue.call_count, 1)
def test_two_activities(self):
"""A single non-status-changing and then a status-changing act should push once."""
from datetime import timedelta
queue_pushed_future = asyncio.Future()
def queue_pushed(*args, **kwargs):
queue_pushed_future.set_result(True)
self.tuqueue.queue.side_effect = queue_pushed
self.worker.push_act_max_interval = timedelta(milliseconds=500)
# Non-status-changing
asyncio.ensure_future(
self.worker.register_task_update(activity='test'),
loop=self.asyncio_loop)
# Status-changing
asyncio.ensure_future(
self.worker.register_task_update(task_status='changed'),
loop=self.asyncio_loop)
self.asyncio_loop.run_until_complete(
asyncio.wait_for(queue_pushed_future, 1))
# Queue push should only be done once
self.assertEqual(self.tuqueue.queue.call_count, 1)
# The scheduled task should be cancelled.
self.assertTrue(self.worker._push_act_to_manager.cancelled())
def test_one_log(self):
"""A single log should be sent to master within reasonable time."""
from datetime import timedelta
queue_pushed_future = asyncio.Future()
def queue_pushed(*args, **kwargs):
queue_pushed_future.set_result(True)
self.tuqueue.queue.side_effect = queue_pushed
self.worker.push_log_max_interval = timedelta(milliseconds=500)
asyncio.ensure_future(
self.worker.register_log('unit tests are ünits'),
loop=self.asyncio_loop)
self.asyncio_loop.run_until_complete(
asyncio.wait_for(queue_pushed_future, 1))
# Queue push should only be done once
self.assertEqual(self.tuqueue.queue.call_count, 1)
def test_two_logs(self):
"""Logging once and then again should push once."""
queue_pushed_future = asyncio.Future()
def queue_pushed(*args, **kwargs):
queue_pushed_future.set_result(True)
self.tuqueue.queue.side_effect = queue_pushed
self.worker.push_log_max_entries = 1 # max 1 queued, will push at 2
# Queued, will schedule push
asyncio.ensure_future(
self.worker.register_log('first line'),
loop=self.asyncio_loop)
# Max queued reached, will cause immediate push
asyncio.ensure_future(
self.worker.register_log('second line'),
loop=self.asyncio_loop)
self.asyncio_loop.run_until_complete(
asyncio.wait_for(queue_pushed_future, 1))
# Queue push should only be done once
self.assertEqual(self.tuqueue.queue.call_count, 1)
# The scheduled task should be cancelled.
self.assertTrue(self.worker._push_log_to_manager.cancelled())
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment