diff --git a/flamenco_worker/upstream_update_queue.py b/flamenco_worker/upstream_update_queue.py index eed8b3d546e2c66f0c8396203569aef9fd771096..cda9e61effaa0e7352781700eb953b930432192d 100644 --- a/flamenco_worker/upstream_update_queue.py +++ b/flamenco_worker/upstream_update_queue.py @@ -29,6 +29,7 @@ class TaskUpdateQueue: _stuff_queued = attr.ib(default=attr.Factory(asyncio.Event), init=False) _db = attr.ib(default=None, init=False) + _queue_lock = attr.ib(default=attr.Factory(asyncio.Lock), init=False) _log = attrs_extra.log('%s.TaskUpdateQueue' % __name__) def _connect_db(self): @@ -127,27 +128,28 @@ class TaskUpdateQueue: Returns True iff the queue was empty, even before flushing. """ - queue_is_empty = True - for rowid, url, payload in self._queue(): - queue_is_empty = False - - self._log.info('Pushing task update to Manager') - resp = await self.manager.post(url, json=payload, loop=loop) - if resp.status_code == 409: - # The task was assigned to another worker, so we're not allowed to - # push updates for it. We have to un-queue this update, as it will - # never be accepted. - self._log.warning('Task was assigned to another worker, discarding update.') - else: - resp.raise_for_status() - self._log.debug('Master accepted pushed update.') - self._unqueue(rowid) - - if queue_is_empty: - # Only clear the flag once the queue has really been cleared. - self._stuff_queued.clear() - - return queue_is_empty + with (await self._queue_lock): + queue_is_empty = True + for rowid, url, payload in self._queue(): + queue_is_empty = False + + self._log.info('Pushing task update to Manager') + resp = await self.manager.post(url, json=payload, loop=loop) + if resp.status_code == 409: + # The task was assigned to another worker, so we're not allowed to + # push updates for it. We have to un-queue this update, as it will + # never be accepted. + self._log.warning('Task was assigned to another worker, discarding update.') + else: + resp.raise_for_status() + self._log.debug('Master accepted pushed update.') + self._unqueue(rowid) + + if queue_is_empty: + # Only clear the flag once the queue has really been cleared. + self._stuff_queued.clear() + + return queue_is_empty async def flush_and_report(self, *, loop: asyncio.AbstractEventLoop): """Flushes the queue, and just reports errors, doesn't wait nor retry."""