diff --git a/flamenco_worker/cli.py b/flamenco_worker/cli.py index 04f210fbcd55da13a7a5aa380e72e3a304ecfff4..3d3cceaff16313a3a52666fa53d396eb60c8f2f8 100644 --- a/flamenco_worker/cli.py +++ b/flamenco_worker/cli.py @@ -6,6 +6,7 @@ import logging import logging.config import os import pathlib +import platform import typing import requests @@ -282,17 +283,16 @@ def asyncio_report_tasks(signum=0, stackframe=None): def construct_asyncio_loop() -> asyncio.AbstractEventLoop: + loop = asyncio.get_event_loop() + if loop.is_closed(): + loop = asyncio.new_event_loop() + # On Windows, the default event loop is SelectorEventLoop which does # not support subprocesses. ProactorEventLoop should be used instead. # Source: https://docs.python.org/3.5/library/asyncio-subprocess.html - import sys - - if sys.platform == 'win32': - loop = asyncio.ProactorEventLoop() - else: - loop = asyncio.get_event_loop() - if loop.is_closed(): - loop = asyncio.new_event_loop() + if platform.system() == 'Windows': + if not isinstance(loop, asyncio.ProactorEventLoop): + loop = asyncio.ProactorEventLoop() asyncio.set_event_loop(loop) return loop diff --git a/flamenco_worker/worker.py b/flamenco_worker/worker.py index bc4633e29276bdcb946376408e8611b782bf7c5e..6482df258aa5c064108621d6489a5612ca06a034 100644 --- a/flamenco_worker/worker.py +++ b/flamenco_worker/worker.py @@ -629,7 +629,8 @@ class FlamencoWorker: elif self._push_act_to_manager is None or self._push_act_to_manager.done(): # Schedule a future push to manager. self._push_act_to_manager = asyncio.ensure_future( - self.push_to_manager(delay=self.push_act_max_interval)) + self.push_to_manager(delay=self.push_act_max_interval), + loop=self.loop) async def register_log(self, log_entry: str, *fmt_args): """Registers a log entry, and possibly sends all queued log entries to upstream Manager. @@ -659,7 +660,8 @@ class FlamencoWorker: elif self._push_log_to_manager is None or self._push_log_to_manager.done(): # Schedule a future push to manager. self._push_log_to_manager = asyncio.ensure_future( - self.push_to_manager(delay=self.push_log_max_interval)) + self.push_to_manager(delay=self.push_log_max_interval), + loop=self.loop) def output_produced(self, *paths: typing.Union[str, pathlib.PurePath]): """Registers a produced output (e.g. rendered frame) with the manager. diff --git a/tests/test_pretask_check.py b/tests/test_pretask_check.py index f35db6c90ed18936e29451f3499600244bc1494c..01dbc3878a117bbeea7bd2646289f9ffbf79dba4 100644 --- a/tests/test_pretask_check.py +++ b/tests/test_pretask_check.py @@ -58,7 +58,7 @@ class PretaskWriteCheckTest(AbstractFWorkerTest): self.assertFalse(testfile.exists(), '%s should have been deleted' % testfile) - self.manager.post.assert_called_once_with('/task', loop=mock.ANY) + self.manager.post.assert_called_once_with('/task', loop=self.asyncio_loop) self.assertIsNone(self.worker.sleeping_fut) def test_happy_not_remove_file(self): @@ -79,7 +79,7 @@ class PretaskWriteCheckTest(AbstractFWorkerTest): self.assertTrue(testfile.exists(), '%s should not have been deleted' % testfile) - self.manager.post.assert_called_once_with('/task', loop=mock.ANY) + self.manager.post.assert_called_once_with('/task', loop=self.asyncio_loop) self.assertIsNone(self.worker.sleeping_fut) @contextlib.contextmanager @@ -99,7 +99,7 @@ class PretaskWriteCheckTest(AbstractFWorkerTest): if post_run is not None: post_run() - self.manager.post.assert_called_once_with('/ack-status-change/error', loop=mock.ANY) + self.manager.post.assert_called_once_with('/ack-status-change/error', loop=self.asyncio_loop) self.assertFalse(self.worker.sleeping_fut.done()) @@ -156,5 +156,5 @@ class PretaskReadCheckTest(AbstractFWorkerTest): if post_run is not None: post_run() - self.manager.post.assert_called_once_with('/ack-status-change/error', loop=mock.ANY) + self.manager.post.assert_called_once_with('/ack-status-change/error', loop=self.asyncio_loop) self.assertFalse(self.worker.sleeping_fut.done()) diff --git a/tests/test_worker.py b/tests/test_worker.py index ac00bc8f346ea2edb09f1a1c08abdd2a2b787535..21694032362e252bc0b633ac0dc87e6a892aaedb 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -323,7 +323,7 @@ class TestWorkerTaskExecution(AbstractFWorkerTest): self.assertTrue(stop_called) - self.manager.post.assert_called_once_with('/task', loop=unittest.mock.ANY) + self.manager.post.assert_called_once_with('/task', loop=self.asyncio_loop) self.tuqueue.queue.assert_any_call( '/tasks/58514d1e9837734f2e71b479/update', {'task_progress_percentage': 0, 'activity': '',