Skip to content
Snippets Groups Projects
Commit c15ef584 authored by Sybren A. Stüvel's avatar Sybren A. Stüvel
Browse files

Lock TaskUpdateQueue.flush()

This way it can be called from multiple coroutines without getting race
conditions.
parent c07105e0
No related branches found
No related tags found
No related merge requests found
......@@ -29,6 +29,7 @@ class TaskUpdateQueue:
_stuff_queued = attr.ib(default=attr.Factory(asyncio.Event), init=False)
_db = attr.ib(default=None, init=False)
_queue_lock = attr.ib(default=attr.Factory(asyncio.Lock), init=False)
_log = attrs_extra.log('%s.TaskUpdateQueue' % __name__)
def _connect_db(self):
......@@ -127,27 +128,28 @@ class TaskUpdateQueue:
Returns True iff the queue was empty, even before flushing.
"""
queue_is_empty = True
for rowid, url, payload in self._queue():
queue_is_empty = False
self._log.info('Pushing task update to Manager')
resp = await self.manager.post(url, json=payload, loop=loop)
if resp.status_code == 409:
# The task was assigned to another worker, so we're not allowed to
# push updates for it. We have to un-queue this update, as it will
# never be accepted.
self._log.warning('Task was assigned to another worker, discarding update.')
else:
resp.raise_for_status()
self._log.debug('Master accepted pushed update.')
self._unqueue(rowid)
if queue_is_empty:
# Only clear the flag once the queue has really been cleared.
self._stuff_queued.clear()
return queue_is_empty
with (await self._queue_lock):
queue_is_empty = True
for rowid, url, payload in self._queue():
queue_is_empty = False
self._log.info('Pushing task update to Manager')
resp = await self.manager.post(url, json=payload, loop=loop)
if resp.status_code == 409:
# The task was assigned to another worker, so we're not allowed to
# push updates for it. We have to un-queue this update, as it will
# never be accepted.
self._log.warning('Task was assigned to another worker, discarding update.')
else:
resp.raise_for_status()
self._log.debug('Master accepted pushed update.')
self._unqueue(rowid)
if queue_is_empty:
# Only clear the flag once the queue has really been cleared.
self._stuff_queued.clear()
return queue_is_empty
async def flush_and_report(self, *, loop: asyncio.AbstractEventLoop):
"""Flushes the queue, and just reports errors, doesn't wait nor retry."""
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment