Skip to content
Snippets Groups Projects
Commit e8a40931 authored by Sybren A. Stüvel's avatar Sybren A. Stüvel
Browse files

Fix memory leak in task update queue

I'm guessing the do_db_push() async function execution was starved by
other asyncio tasks, causing Python to remember all the to-be-queued
payloads in memory. By making the function synchronous this doesn't happen.
parent 7b9c46bb
Branches
Tags
No related merge requests found
......@@ -50,7 +50,7 @@ class TaskUpdateQueue:
self._db.close()
self._db = None
def queue(self, url, payload, *, loop: asyncio.AbstractEventLoop) -> asyncio.Future:
def queue(self, url, payload):
"""Push some payload onto the queue."""
if self._db is None:
......@@ -59,7 +59,6 @@ class TaskUpdateQueue:
# Store the pickled payload in the SQLite database.
pickled = pickle.dumps(payload)
async def do_db_push():
self._db.execute('INSERT INTO fworker_queue (url, payload) values (?, ?)',
(url, pickled))
self._db.commit()
......@@ -67,8 +66,6 @@ class TaskUpdateQueue:
# Notify the work loop that stuff has been queued.
self._stuff_queued.set()
return asyncio.ensure_future(do_db_push(), loop=loop)
async def work(self, *, loop=None):
"""Loop that pushes queued payloads to the Flamenco Manager.
......
......@@ -513,7 +513,7 @@ class FlamencoWorker:
self._log.debug('push_to_manager: nothing to push')
return
self.tuqueue.queue('/tasks/%s/update' % self.task_id, payload, loop=self.loop)
self.tuqueue.queue('/tasks/%s/update' % self.task_id, payload)
async def register_task_update(self, *,
task_status: str = None,
......@@ -567,8 +567,8 @@ class FlamencoWorker:
queue_size = len(self._queued_log_entries)
if queue_size > self.push_log_max_entries:
self._log.info('Queued up more than %i log entries, pushing to manager',
self.push_log_max_entries)
self._log.info('Queued up %i > %i log entries, pushing to manager',
queue_size, self.push_log_max_entries)
await self.push_to_manager()
elif datetime.datetime.now() - self.last_log_push > self.push_log_max_interval:
self._log.info('More than %s since last log update, pushing to manager',
......
......@@ -76,7 +76,7 @@ class TaskUpdateQueueTest(AbstractWorkerTest):
self.manager.post.side_effect = push_callback
self.tuqueue.queue('/push/here', payload, loop=self.asyncio_loop)
self.tuqueue.queue('/push/here', payload)
# Run the loop for 2 seconds. This should be enough for 3 retries of 0.3 seconds + handling
# the actual payload.
......@@ -102,8 +102,7 @@ class TaskUpdateQueueTest(AbstractWorkerTest):
payload = {'key': 'value',
'sub': {'some': 13,
'values': datetime.datetime.now()}}
self.asyncio_loop.run_until_complete(
self.tuqueue.queue('/push/there', payload, loop=self.asyncio_loop))
self.tuqueue.queue('/push/there', payload)
self.manager.post.assert_not_called()
self.tuqueue._disconnect_db()
......@@ -169,7 +168,7 @@ class TaskUpdateQueueTest(AbstractWorkerTest):
self.manager.post.side_effect = push_callback
self.tuqueue.queue('/push/here', payload, loop=self.asyncio_loop)
self.tuqueue.queue('/push/here', payload)
# Run the loop for 2 seconds. This should be enough for 3 retries of 0.3 seconds + handling
# the actual payload.
......
......@@ -211,13 +211,11 @@ class TestWorkerTaskExecution(AbstractFWorkerTest):
{'task_progress_percentage': 0, 'activity': '',
'command_progress_percentage': 0, 'task_status': 'active',
'current_command_idx': 0},
loop=self.loop,
),
call('/tasks/58514d1e9837734f2e71b479/update',
{'task_progress_percentage': 0, 'activity': 'Task completed',
'command_progress_percentage': 0, 'task_status': 'completed',
'current_command_idx': 0},
loop=self.loop,
)
])
self.assertEqual(self.tuqueue.queue.call_count, 2)
......@@ -267,13 +265,12 @@ class TestWorkerTaskExecution(AbstractFWorkerTest):
{'task_progress_percentage': 0, 'activity': '',
'command_progress_percentage': 0, 'task_status': 'active',
'current_command_idx': 0},
loop=self.loop,
)
# A bit clunky because we don't know which timestamp is included in the log line.
last_args, last_kwargs = self.tuqueue.queue.call_args
self.assertEqual(last_args[0], '/tasks/58514d1e9837734f2e71b479/update')
self.assertEqual(last_kwargs, {'loop': self.loop})
self.assertEqual(last_kwargs, {})
self.assertIn('log', last_args[1])
self.assertTrue(last_args[1]['log'].endswith(
'Worker 1234 stopped running this task, no longer allowed to run by Manager'))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment