From c15ef58417b5f87b9d49784ac1813a5b17dce76c Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Sybren=20A=2E=20St=C3=BCvel?= <sybren@stuvel.eu>
Date: Fri, 29 Sep 2017 12:18:13 +0200
Subject: [PATCH] Lock TaskUpdateQueue.flush()

This way it can be called from multiple coroutines without getting race
conditions.
---
 flamenco_worker/upstream_update_queue.py | 44 +++++++++++++-----------
 1 file changed, 23 insertions(+), 21 deletions(-)

diff --git a/flamenco_worker/upstream_update_queue.py b/flamenco_worker/upstream_update_queue.py
index eed8b3d5..cda9e61e 100644
--- a/flamenco_worker/upstream_update_queue.py
+++ b/flamenco_worker/upstream_update_queue.py
@@ -29,6 +29,7 @@ class TaskUpdateQueue:
 
     _stuff_queued = attr.ib(default=attr.Factory(asyncio.Event), init=False)
     _db = attr.ib(default=None, init=False)
+    _queue_lock = attr.ib(default=attr.Factory(asyncio.Lock), init=False)
     _log = attrs_extra.log('%s.TaskUpdateQueue' % __name__)
 
     def _connect_db(self):
@@ -127,27 +128,28 @@ class TaskUpdateQueue:
         Returns True iff the queue was empty, even before flushing.
         """
 
-        queue_is_empty = True
-        for rowid, url, payload in self._queue():
-            queue_is_empty = False
-
-            self._log.info('Pushing task update to Manager')
-            resp = await self.manager.post(url, json=payload, loop=loop)
-            if resp.status_code == 409:
-                # The task was assigned to another worker, so we're not allowed to
-                # push updates for it. We have to un-queue this update, as it will
-                # never be accepted.
-                self._log.warning('Task was assigned to another worker, discarding update.')
-            else:
-                resp.raise_for_status()
-                self._log.debug('Master accepted pushed update.')
-            self._unqueue(rowid)
-
-        if queue_is_empty:
-            # Only clear the flag once the queue has really been cleared.
-            self._stuff_queued.clear()
-
-        return queue_is_empty
+        with (await self._queue_lock):
+            queue_is_empty = True
+            for rowid, url, payload in self._queue():
+                queue_is_empty = False
+
+                self._log.info('Pushing task update to Manager')
+                resp = await self.manager.post(url, json=payload, loop=loop)
+                if resp.status_code == 409:
+                    # The task was assigned to another worker, so we're not allowed to
+                    # push updates for it. We have to un-queue this update, as it will
+                    # never be accepted.
+                    self._log.warning('Task was assigned to another worker, discarding update.')
+                else:
+                    resp.raise_for_status()
+                    self._log.debug('Master accepted pushed update.')
+                self._unqueue(rowid)
+
+            if queue_is_empty:
+                # Only clear the flag once the queue has really been cleared.
+                self._stuff_queued.clear()
+
+            return queue_is_empty
 
     async def flush_and_report(self, *, loop: asyncio.AbstractEventLoop):
         """Flushes the queue, and just reports errors, doesn't wait nor retry."""
-- 
GitLab