Skip to content
Snippets Groups Projects
Commit c9dd0952 authored by Sybren A. Stüvel's avatar Sybren A. Stüvel
Browse files

Worker: queued log entries are now protected with a lock.

It may be a bit overkill, as only one coroutine should be handling those
entries anyway.
parent 52fe0bda
No related branches found
No related tags found
No related merge requests found
......@@ -41,7 +41,8 @@ class FlamencoWorker:
default=None, init=False,
validator=attr.validators.optional(attr.validators.instance_of(str))
)
queued_log_entries = attr.ib(default=attr.Factory(list), init=False)
_queued_log_entries = attr.ib(default=attr.Factory(list), init=False)
_queue_lock = attr.ib(default=attr.Factory(asyncio.Lock), init=False)
last_log_push = attr.ib(
default=datetime.datetime.now(),
validator=attr.validators.optional(attr.validators.instance_of(datetime.datetime)))
......@@ -171,7 +172,8 @@ class FlamencoWorker:
except Exception as ex:
self._log.exception('Uncaught exception executing task %s' % self.task_id)
try:
self.queued_log_entries.append(traceback.format_exc())
with (await self._queue_lock):
self._queued_log_entries.append(traceback.format_exc())
await self.register_task_update(
task_status='failed',
activity='Uncaught exception: %s %s' % (type(ex).__name__, ex),
......@@ -200,10 +202,11 @@ class FlamencoWorker:
now = datetime.datetime.now()
self.last_activity_push = now
if self.queued_log_entries:
payload['log'] = '\n'.join(self.queued_log_entries)
self.queued_log_entries.clear()
self.last_log_push = now
with (await self._queue_lock):
if self._queued_log_entries:
payload['log'] = '\n'.join(self._queued_log_entries)
self._queued_log_entries.clear()
self.last_log_push = now
resp = self.manager.post('/tasks/%s/update' % self.task_id,
json=payload,
......@@ -255,9 +258,11 @@ class FlamencoWorker:
log_entry %= fmt_args
now = datetime.datetime.now(tz.tzutc()).isoformat()
self.queued_log_entries.append('%s: %s' % (now, log_entry))
with (await self._queue_lock):
self._queued_log_entries.append('%s: %s' % (now, log_entry))
queue_size = len(self._queued_log_entries)
if len(self.queued_log_entries) > PUSH_LOG_MAX_ENTRIES:
if queue_size > PUSH_LOG_MAX_ENTRIES:
self._log.info('Queued up more than %i log entries, pushing to manager',
PUSH_LOG_MAX_ENTRIES)
await self.push_to_manager()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment