diff --git a/packages/flamenco-worker-python/flamenco_worker/upstream_update_queue.py b/packages/flamenco-worker-python/flamenco_worker/upstream_update_queue.py index 0f0ab50e9a712753b7a4646d54c24bd6e6ab6550..bffe3d6f1ea4f2adbd3132f3f1adb583c5275c1c 100644 --- a/packages/flamenco-worker-python/flamenco_worker/upstream_update_queue.py +++ b/packages/flamenco-worker-python/flamenco_worker/upstream_update_queue.py @@ -126,8 +126,14 @@ class TaskUpdateQueue: self._log.info('Pushing task update to Manager') resp = await self.manager.post(url, json=payload, loop=loop) - resp.raise_for_status() - self._log.debug('Master accepted pushed update.') + if resp.status_code == 409: + # The task was assigned to another worker, so we're not allowed to + # push updates for it. We have to un-queue this update, as it will + # never be accepted. + self._log.warning('Task was assigned to another worker, discarding update.') + else: + resp.raise_for_status() + self._log.debug('Master accepted pushed update.') self._unqueue(rowid) if queue_is_empty: diff --git a/packages/flamenco-worker-python/tests/test_upstream_update_queue.py b/packages/flamenco-worker-python/tests/test_upstream_update_queue.py index b958776cddf9b48e86656d5009051588a32bd7f5..4f8083a68bd16f7a1212ef89a002da246df4767a 100644 --- a/packages/flamenco-worker-python/tests/test_upstream_update_queue.py +++ b/packages/flamenco-worker-python/tests/test_upstream_update_queue.py @@ -147,3 +147,39 @@ class TaskUpdateQueueTest(AbstractWorkerTest): self.assertEqual(received_url, '/push/there') self.assertEqual(received_payload, payload) self.assertEqual(received_loop, self.asyncio_loop) + + def test_conflict(self): + """A 409 Conflict response should discard a queued task update. + """ + + from mock_responses import JsonResponse, EmptyResponse + + # Try different value types + payload = {'key': 'value', + 'sub': {'some': 13, + 'values': datetime.datetime.now()}} + + tries = 0 + + async def push_callback(url, *, json, loop): + nonlocal tries + tries += 1 + self.shutdown_future.cancel() + return JsonResponse({}, status_code=409) + + self.manager.post.side_effect = push_callback + + self.tuqueue.queue('/push/here', payload, loop=self.asyncio_loop) + + # Run the loop for 2 seconds. This should be enough for 3 retries of 0.3 seconds + handling + # the actual payload. + self.asyncio_loop.run_until_complete( + asyncio.wait_for( + self.tuqueue.work(loop=self.asyncio_loop), + timeout=2 + ) + ) + + # There should only be one attempt at delivering this payload. + self.assertEqual(1, tries) + self.assertEqual([], list(self.tuqueue._queue()))