From fba56c40ec09a9f6da16f5011018b348be5fc58c Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Sybren=20A=2E=20St=C3=BCvel?= <sybren@stuvel.eu>
Date: Thu, 10 Jan 2019 16:40:11 +0100
Subject: [PATCH] Properly create the asyncio loop on Windows

On Windows, the loop was recreated on every call to `construct_asyncio_loop()`,
whereas on Linux the existing loop would be reused if possible. Now both
platforms work the same.
---
 flamenco_worker/cli.py      | 16 ++++++++--------
 flamenco_worker/worker.py   |  6 ++++--
 tests/test_pretask_check.py |  8 ++++----
 tests/test_worker.py        |  2 +-
 4 files changed, 17 insertions(+), 15 deletions(-)

diff --git a/flamenco_worker/cli.py b/flamenco_worker/cli.py
index 04f210fb..3d3cceaf 100644
--- a/flamenco_worker/cli.py
+++ b/flamenco_worker/cli.py
@@ -6,6 +6,7 @@ import logging
 import logging.config
 import os
 import pathlib
+import platform
 import typing
 
 import requests
@@ -282,17 +283,16 @@ def asyncio_report_tasks(signum=0, stackframe=None):
 
 
 def construct_asyncio_loop() -> asyncio.AbstractEventLoop:
+    loop = asyncio.get_event_loop()
+    if loop.is_closed():
+        loop = asyncio.new_event_loop()
+
     # On Windows, the default event loop is SelectorEventLoop which does
     # not support subprocesses. ProactorEventLoop should be used instead.
     # Source: https://docs.python.org/3.5/library/asyncio-subprocess.html
-    import sys
-
-    if sys.platform == 'win32':
-        loop = asyncio.ProactorEventLoop()
-    else:
-        loop = asyncio.get_event_loop()
-        if loop.is_closed():
-            loop = asyncio.new_event_loop()
+    if platform.system() == 'Windows':
+        if not isinstance(loop, asyncio.ProactorEventLoop):
+            loop = asyncio.ProactorEventLoop()
 
     asyncio.set_event_loop(loop)
     return loop
diff --git a/flamenco_worker/worker.py b/flamenco_worker/worker.py
index bc4633e2..6482df25 100644
--- a/flamenco_worker/worker.py
+++ b/flamenco_worker/worker.py
@@ -629,7 +629,8 @@ class FlamencoWorker:
         elif self._push_act_to_manager is None or self._push_act_to_manager.done():
             # Schedule a future push to manager.
             self._push_act_to_manager = asyncio.ensure_future(
-                self.push_to_manager(delay=self.push_act_max_interval))
+                self.push_to_manager(delay=self.push_act_max_interval),
+                loop=self.loop)
 
     async def register_log(self, log_entry: str, *fmt_args):
         """Registers a log entry, and possibly sends all queued log entries to upstream Manager.
@@ -659,7 +660,8 @@ class FlamencoWorker:
         elif self._push_log_to_manager is None or self._push_log_to_manager.done():
             # Schedule a future push to manager.
             self._push_log_to_manager = asyncio.ensure_future(
-                self.push_to_manager(delay=self.push_log_max_interval))
+                self.push_to_manager(delay=self.push_log_max_interval),
+                loop=self.loop)
 
     def output_produced(self, *paths: typing.Union[str, pathlib.PurePath]):
         """Registers a produced output (e.g. rendered frame) with the manager.
diff --git a/tests/test_pretask_check.py b/tests/test_pretask_check.py
index f35db6c9..01dbc387 100644
--- a/tests/test_pretask_check.py
+++ b/tests/test_pretask_check.py
@@ -58,7 +58,7 @@ class PretaskWriteCheckTest(AbstractFWorkerTest):
 
             self.assertFalse(testfile.exists(), '%s should have been deleted' % testfile)
 
-        self.manager.post.assert_called_once_with('/task', loop=mock.ANY)
+        self.manager.post.assert_called_once_with('/task', loop=self.asyncio_loop)
         self.assertIsNone(self.worker.sleeping_fut)
 
     def test_happy_not_remove_file(self):
@@ -79,7 +79,7 @@ class PretaskWriteCheckTest(AbstractFWorkerTest):
 
             self.assertTrue(testfile.exists(), '%s should not have been deleted' % testfile)
 
-        self.manager.post.assert_called_once_with('/task', loop=mock.ANY)
+        self.manager.post.assert_called_once_with('/task', loop=self.asyncio_loop)
         self.assertIsNone(self.worker.sleeping_fut)
 
     @contextlib.contextmanager
@@ -99,7 +99,7 @@ class PretaskWriteCheckTest(AbstractFWorkerTest):
             if post_run is not None:
                 post_run()
 
-        self.manager.post.assert_called_once_with('/ack-status-change/error', loop=mock.ANY)
+        self.manager.post.assert_called_once_with('/ack-status-change/error', loop=self.asyncio_loop)
         self.assertFalse(self.worker.sleeping_fut.done())
 
 
@@ -156,5 +156,5 @@ class PretaskReadCheckTest(AbstractFWorkerTest):
             if post_run is not None:
                 post_run()
 
-        self.manager.post.assert_called_once_with('/ack-status-change/error', loop=mock.ANY)
+        self.manager.post.assert_called_once_with('/ack-status-change/error', loop=self.asyncio_loop)
         self.assertFalse(self.worker.sleeping_fut.done())
diff --git a/tests/test_worker.py b/tests/test_worker.py
index ac00bc8f..21694032 100644
--- a/tests/test_worker.py
+++ b/tests/test_worker.py
@@ -323,7 +323,7 @@ class TestWorkerTaskExecution(AbstractFWorkerTest):
 
         self.assertTrue(stop_called)
 
-        self.manager.post.assert_called_once_with('/task', loop=unittest.mock.ANY)
+        self.manager.post.assert_called_once_with('/task', loop=self.asyncio_loop)
         self.tuqueue.queue.assert_any_call(
             '/tasks/58514d1e9837734f2e71b479/update',
             {'task_progress_percentage': 0, 'activity': '',
-- 
GitLab