diff --git a/CHANGELOG.md b/CHANGELOG.md index d79d74c103a6b023626cbc644fb4fa6ada3fb196..66aa6e2315d175b9175da95e53bc943a6d0dd0a8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,8 @@ changed functionality, fixed bugs). checked for writability or readability. Note that write checks are lossy, and bytes are appended to any existing file used to check writability. When such a check fails, the Worker will go to status `error` and sleep for 10 minutes before trying again. +- Subprocess commands now write the spawned process PID in a text file, and refuse to run if there + already is such a file with an alive PID. ## Version 2.1.0 (2018-01-04) diff --git a/flamenco_worker/cli.py b/flamenco_worker/cli.py index f40be64f0336f911a59ff400ebfb7ffd11b733a4..b1a13e1dd6747957b63081ac430fb0a539376bac 100644 --- a/flamenco_worker/cli.py +++ b/flamenco_worker/cli.py @@ -101,7 +101,9 @@ def main(): shutdown_future=shutdown_future, ) trunner = runner.TaskRunner( - shutdown_future=shutdown_future) + shutdown_future=shutdown_future, + subprocess_pid_file=confparser.value('subprocess_pid_file'), + ) pretask_check_params = parse_pretask_check_config(confparser, log) diff --git a/flamenco_worker/commands.py b/flamenco_worker/commands.py index d144a8aa9a4529abec2a37e45e69b7ed654dab38..78f5e0c5de4c21bdf788f67781dcbb15ef40e03e 100644 --- a/flamenco_worker/commands.py +++ b/flamenco_worker/commands.py @@ -4,6 +4,7 @@ import abc import asyncio import asyncio.subprocess import logging +import pathlib import re import time import typing @@ -414,6 +415,45 @@ class AbstractSubprocessCommand(AbstractCommand): proc = attr.ib(validator=attr.validators.instance_of(asyncio.subprocess.Process), init=False) + @property + def subprocess_pid_file(self) -> typing.Optional[pathlib.Path]: + subprocess_pid_file = self.worker.trunner.subprocess_pid_file + if not subprocess_pid_file: + return None + return pathlib.Path(subprocess_pid_file) + + def validate(self, settings: dict) -> typing.Optional[str]: + supererr = super().validate(settings) + if supererr: + return supererr + + pidfile = self.subprocess_pid_file + if pidfile is None: + self._log.warning('No subprocess PID file configured; this is not recommended.') + return None + + try: + pid_str = pidfile.read_text() + except FileNotFoundError: + # This is expected, as it means no subprocess is running. + return None + if not pid_str: + pidfile.unlink() + return None + + pid = int(pid_str) + self._log.warning('Found PID file %s with PID %r', pidfile, pid) + + import psutil + + try: + proc = psutil.Process(pid) + except psutil.NoSuchProcess: + self._log.warning('Deleting pidfile %s for stale PID %r', pidfile, pid) + pidfile.unlink() + return None + return 'Subprocess from %s is still running: %s' % (pidfile, proc) + async def subprocess(self, args: list): import subprocess import shlex @@ -429,6 +469,12 @@ class AbstractSubprocessCommand(AbstractCommand): stderr=subprocess.STDOUT, ) + pid_path = self.subprocess_pid_file + if pid_path: + # Require exclusive creation to prevent race conditions. + with pid_path.open('x') as pidfile: + pidfile.write(str(self.proc.pid)) + try: while not self.proc.stdout.at_eof(): try: @@ -464,6 +510,9 @@ class AbstractSubprocessCommand(AbstractCommand): self._log.info('asyncio task got canceled, killing subprocess.') await self.abort() raise + finally: + if pid_path: + pid_path.unlink() async def process_line(self, line: str) -> typing.Optional[str]: """Processes the line, returning None to ignore it.""" @@ -526,6 +575,7 @@ class ExecCommand(AbstractSubprocessCommand): return '"cmd" must be a string' if not cmd: return '"cmd" may not be empty' + return super().validate(settings) async def execute(self, settings: dict): import shlex @@ -602,7 +652,7 @@ class BlenderRenderCommand(AbstractSubprocessCommand): # Ok, now it's fatal. return 'filepath %r does not exist' % filepath - return None + return super().validate(settings) async def execute(self, settings: dict): cmd = self._build_blender_cmd(settings) @@ -764,6 +814,8 @@ class MergeProgressiveRendersCommand(AbstractSubprocessCommand): _, err = self._setting(settings, 'weight2', True, int) if err: return err + return super().validate(settings) + async def execute(self, settings: dict): import tempfile diff --git a/flamenco_worker/config.py b/flamenco_worker/config.py index 8d19cd7fa099b2e26327e4fff2b0a3669bb4132e..a8523c3061210b720b047ec01c0fe7e5bf2c7b19 100644 --- a/flamenco_worker/config.py +++ b/flamenco_worker/config.py @@ -17,6 +17,7 @@ DEFAULT_CONFIG = { ('manager_url', ''), ('task_types', 'unknown sleep blender-render'), ('task_update_queue_db', 'flamenco-worker.db'), + ('subprocess_pid_file', 'flamenco-worker-subprocess.pid'), ('may_i_run_interval_seconds', '5'), ('worker_id', ''), ('worker_secret', ''), diff --git a/flamenco_worker/runner.py b/flamenco_worker/runner.py index 7d27389ad2449182d5de37f8f0d23e408de1edeb..23182dc9669c4e7b3e32325c6a7f709e960719a0 100644 --- a/flamenco_worker/runner.py +++ b/flamenco_worker/runner.py @@ -13,6 +13,7 @@ class TaskRunner: """Runs tasks, sending updates back to the worker.""" shutdown_future = attr.ib(validator=attr.validators.instance_of(asyncio.Future)) + subprocess_pid_file = attr.ib(validator=attr.validators.instance_of(str)) last_command_idx = attr.ib(default=0, init=False) _log = attrs_extra.log('%s.TaskRunner' % __name__) diff --git a/requirements.txt b/requirements.txt index edf9b3d2be4d73bf93b2c7e0f7dc894b5052e370..4d7703b24dcf69ce72c56e373ac69e3cb76ea7a3 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,3 @@ attrs==18.2.0 requests==2.20.1 +psutil==5.4.8 diff --git a/tests/test_commands_subprocess.py b/tests/test_commands_subprocess.py new file mode 100644 index 0000000000000000000000000000000000000000..cd60f395137fa15cae2bf67f5b70fb069d2220c0 --- /dev/null +++ b/tests/test_commands_subprocess.py @@ -0,0 +1,94 @@ +import os +from pathlib import Path +import random +import tempfile + +import psutil + +from test_runner import AbstractCommandTest + + +class PIDFileTest(AbstractCommandTest): + def setUp(self): + super().setUp() + + from flamenco_worker.commands import ExecCommand + + self.cmd = ExecCommand( + worker=self.fworker, + task_id='12345', + command_idx=0, + ) + + def test_alive(self): + with tempfile.NamedTemporaryFile(suffix='.pid') as tmpfile: + pidfile = Path(tmpfile.name) + my_pid = os.getpid() + pidfile.write_text(str(my_pid)) + + self.cmd.worker.trunner.subprocess_pid_file = pidfile + + msg = self.cmd.validate({'cmd': 'echo'}) + self.assertIn(str(pidfile), msg) + self.assertIn(str(psutil.Process(my_pid)), msg) + + def test_alive_newlines(self): + with tempfile.NamedTemporaryFile(suffix='.pid') as tmpfile: + pidfile = Path(tmpfile.name) + my_pid = os.getpid() + pidfile.write_text('\n%s\n' % my_pid) + + self.cmd.worker.trunner.subprocess_pid_file = pidfile + + msg = self.cmd.validate({'cmd': 'echo'}) + self.assertIn(str(pidfile), msg) + self.assertIn(str(psutil.Process(my_pid)), msg) + + def test_dead(self): + # Find a PID that doesn't exist. + for _ in range(1000): + pid = random.randint(1, 2**16) + try: + psutil.Process(pid) + except psutil.NoSuchProcess: + break + else: + self.fail('Unable to find unused PID') + + with tempfile.TemporaryDirectory(suffix='.pid') as tmpname: + tmpdir = Path(tmpname) + pidfile = tmpdir / 'stale.pid' + pidfile.write_text(str(pid)) + + self.cmd.worker.trunner.subprocess_pid_file = pidfile + + msg = self.cmd.validate({'cmd': 'echo'}) + self.assertFalse(msg) + self.assertFalse(pidfile.exists(), 'Stale PID file should have been deleted') + + def test_nonexistant(self): + with tempfile.TemporaryDirectory(suffix='.pid') as tmpname: + tmpdir = Path(tmpname) + pidfile = tmpdir / 'stale.pid' + + self.cmd.worker.trunner.subprocess_pid_file = pidfile + + msg = self.cmd.validate({'cmd': 'echo'}) + self.assertFalse(msg) + + def test_empty(self): + with tempfile.TemporaryDirectory(suffix='.pid') as tmpname: + tmpdir = Path(tmpname) + pidfile = tmpdir / 'empty.pid' + pidfile.write_bytes(b'') + + self.cmd.worker.trunner.subprocess_pid_file = pidfile + + msg = self.cmd.validate({'cmd': 'echo'}) + self.assertFalse(msg) + + def test_not_configured(self): + self.cmd.worker.trunner.subprocess_pid_file = None + + msg = self.cmd.validate({'cmd': 'echo'}) + self.assertFalse(msg) diff --git a/tests/test_runner.py b/tests/test_runner.py index a4e989aa4b187617ab39a80787b4e050af73fb85..0ec1a4af5880e9290888fc6f84edae95328764c5 100644 --- a/tests/test_runner.py +++ b/tests/test_runner.py @@ -8,10 +8,13 @@ class AbstractCommandTest(AbstractWorkerTest): def setUp(self): from mock_responses import CoroMock from flamenco_worker.worker import FlamencoWorker + from flamenco_worker.runner import TaskRunner from flamenco_worker.cli import construct_asyncio_loop self.loop = construct_asyncio_loop() self.fworker = Mock(spec=FlamencoWorker) + self.fworker.trunner = Mock(spec=TaskRunner) + self.fworker.trunner.subprocess_pid_file = None self.fworker.register_log = CoroMock() self.fworker.register_task_update = CoroMock()