Skip to content
Snippets Groups Projects
Commit fba56c40 authored by Sybren A. Stüvel's avatar Sybren A. Stüvel
Browse files

Properly create the asyncio loop on Windows

On Windows, the loop was recreated on every call to `construct_asyncio_loop()`,
whereas on Linux the existing loop would be reused if possible. Now both
platforms work the same.
parent e0ef84e6
No related branches found
No related tags found
No related merge requests found
...@@ -6,6 +6,7 @@ import logging ...@@ -6,6 +6,7 @@ import logging
import logging.config import logging.config
import os import os
import pathlib import pathlib
import platform
import typing import typing
import requests import requests
...@@ -282,17 +283,16 @@ def asyncio_report_tasks(signum=0, stackframe=None): ...@@ -282,17 +283,16 @@ def asyncio_report_tasks(signum=0, stackframe=None):
def construct_asyncio_loop() -> asyncio.AbstractEventLoop: def construct_asyncio_loop() -> asyncio.AbstractEventLoop:
loop = asyncio.get_event_loop()
if loop.is_closed():
loop = asyncio.new_event_loop()
# On Windows, the default event loop is SelectorEventLoop which does # On Windows, the default event loop is SelectorEventLoop which does
# not support subprocesses. ProactorEventLoop should be used instead. # not support subprocesses. ProactorEventLoop should be used instead.
# Source: https://docs.python.org/3.5/library/asyncio-subprocess.html # Source: https://docs.python.org/3.5/library/asyncio-subprocess.html
import sys if platform.system() == 'Windows':
if not isinstance(loop, asyncio.ProactorEventLoop):
if sys.platform == 'win32': loop = asyncio.ProactorEventLoop()
loop = asyncio.ProactorEventLoop()
else:
loop = asyncio.get_event_loop()
if loop.is_closed():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop) asyncio.set_event_loop(loop)
return loop return loop
......
...@@ -629,7 +629,8 @@ class FlamencoWorker: ...@@ -629,7 +629,8 @@ class FlamencoWorker:
elif self._push_act_to_manager is None or self._push_act_to_manager.done(): elif self._push_act_to_manager is None or self._push_act_to_manager.done():
# Schedule a future push to manager. # Schedule a future push to manager.
self._push_act_to_manager = asyncio.ensure_future( self._push_act_to_manager = asyncio.ensure_future(
self.push_to_manager(delay=self.push_act_max_interval)) self.push_to_manager(delay=self.push_act_max_interval),
loop=self.loop)
async def register_log(self, log_entry: str, *fmt_args): async def register_log(self, log_entry: str, *fmt_args):
"""Registers a log entry, and possibly sends all queued log entries to upstream Manager. """Registers a log entry, and possibly sends all queued log entries to upstream Manager.
...@@ -659,7 +660,8 @@ class FlamencoWorker: ...@@ -659,7 +660,8 @@ class FlamencoWorker:
elif self._push_log_to_manager is None or self._push_log_to_manager.done(): elif self._push_log_to_manager is None or self._push_log_to_manager.done():
# Schedule a future push to manager. # Schedule a future push to manager.
self._push_log_to_manager = asyncio.ensure_future( self._push_log_to_manager = asyncio.ensure_future(
self.push_to_manager(delay=self.push_log_max_interval)) self.push_to_manager(delay=self.push_log_max_interval),
loop=self.loop)
def output_produced(self, *paths: typing.Union[str, pathlib.PurePath]): def output_produced(self, *paths: typing.Union[str, pathlib.PurePath]):
"""Registers a produced output (e.g. rendered frame) with the manager. """Registers a produced output (e.g. rendered frame) with the manager.
......
...@@ -58,7 +58,7 @@ class PretaskWriteCheckTest(AbstractFWorkerTest): ...@@ -58,7 +58,7 @@ class PretaskWriteCheckTest(AbstractFWorkerTest):
self.assertFalse(testfile.exists(), '%s should have been deleted' % testfile) self.assertFalse(testfile.exists(), '%s should have been deleted' % testfile)
self.manager.post.assert_called_once_with('/task', loop=mock.ANY) self.manager.post.assert_called_once_with('/task', loop=self.asyncio_loop)
self.assertIsNone(self.worker.sleeping_fut) self.assertIsNone(self.worker.sleeping_fut)
def test_happy_not_remove_file(self): def test_happy_not_remove_file(self):
...@@ -79,7 +79,7 @@ class PretaskWriteCheckTest(AbstractFWorkerTest): ...@@ -79,7 +79,7 @@ class PretaskWriteCheckTest(AbstractFWorkerTest):
self.assertTrue(testfile.exists(), '%s should not have been deleted' % testfile) self.assertTrue(testfile.exists(), '%s should not have been deleted' % testfile)
self.manager.post.assert_called_once_with('/task', loop=mock.ANY) self.manager.post.assert_called_once_with('/task', loop=self.asyncio_loop)
self.assertIsNone(self.worker.sleeping_fut) self.assertIsNone(self.worker.sleeping_fut)
@contextlib.contextmanager @contextlib.contextmanager
...@@ -99,7 +99,7 @@ class PretaskWriteCheckTest(AbstractFWorkerTest): ...@@ -99,7 +99,7 @@ class PretaskWriteCheckTest(AbstractFWorkerTest):
if post_run is not None: if post_run is not None:
post_run() post_run()
self.manager.post.assert_called_once_with('/ack-status-change/error', loop=mock.ANY) self.manager.post.assert_called_once_with('/ack-status-change/error', loop=self.asyncio_loop)
self.assertFalse(self.worker.sleeping_fut.done()) self.assertFalse(self.worker.sleeping_fut.done())
...@@ -156,5 +156,5 @@ class PretaskReadCheckTest(AbstractFWorkerTest): ...@@ -156,5 +156,5 @@ class PretaskReadCheckTest(AbstractFWorkerTest):
if post_run is not None: if post_run is not None:
post_run() post_run()
self.manager.post.assert_called_once_with('/ack-status-change/error', loop=mock.ANY) self.manager.post.assert_called_once_with('/ack-status-change/error', loop=self.asyncio_loop)
self.assertFalse(self.worker.sleeping_fut.done()) self.assertFalse(self.worker.sleeping_fut.done())
...@@ -323,7 +323,7 @@ class TestWorkerTaskExecution(AbstractFWorkerTest): ...@@ -323,7 +323,7 @@ class TestWorkerTaskExecution(AbstractFWorkerTest):
self.assertTrue(stop_called) self.assertTrue(stop_called)
self.manager.post.assert_called_once_with('/task', loop=unittest.mock.ANY) self.manager.post.assert_called_once_with('/task', loop=self.asyncio_loop)
self.tuqueue.queue.assert_any_call( self.tuqueue.queue.assert_any_call(
'/tasks/58514d1e9837734f2e71b479/update', '/tasks/58514d1e9837734f2e71b479/update',
{'task_progress_percentage': 0, 'activity': '', {'task_progress_percentage': 0, 'activity': '',
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment