diff --git a/flamenco_worker/commands.py b/flamenco_worker/commands.py index 52309aa5eea8df7bb6969962618bbf318922c0e9..2e1afa3634e70e05e3bbc1f155e11b89c5a4dd6b 100644 --- a/flamenco_worker/commands.py +++ b/flamenco_worker/commands.py @@ -100,7 +100,7 @@ class AbstractCommand(metaclass=abc.ABCMeta): verr = self.validate(settings) if verr is not None: - self._log.warning('%s: Error in settings: %s', self.identifier, verr) + self._log.warning('Error in settings: %s', verr) await self.worker.register_log('%s: Error in settings: %s', self.identifier, verr) await self.worker.register_task_update( task_status='failed', @@ -438,8 +438,9 @@ class AbstractSubprocessCommand(AbstractCommand): # This is expected, as it means no subprocess is running. return None if not pid_str: - pidfile.unlink() - return None + # This could be an indication that a PID file is being written right now + # (already opened, but the content hasn't been written yet). + return 'Empty PID file %s, refusing to create new subprocess just to be sure' % pidfile pid = int(pid_str) self._log.warning('Found PID file %s with PID %r', pidfile, pid) @@ -468,9 +469,14 @@ class AbstractSubprocessCommand(AbstractCommand): pid = self.proc.pid if pid_path: # Require exclusive creation to prevent race conditions. - with pid_path.open('x') as pidfile: - pidfile.write(str(pid)) - + try: + with pid_path.open('x') as pidfile: + pidfile.write(str(pid)) + except FileExistsError: + self._log.error('PID file %r already exists, killing just-spawned process pid=%d', + pid_path, pid) + await self.abort() + raise try: assert self.proc.stdout is not None diff --git a/tests/test_commands_subprocess.py b/tests/test_commands_subprocess.py index 2240a43e22a444dffea95e4c39d12205518c60fc..bdf54ff871a53225da62c51544739f38310dfaf2 100644 --- a/tests/test_commands_subprocess.py +++ b/tests/test_commands_subprocess.py @@ -1,7 +1,11 @@ +import asyncio import os from pathlib import Path import random +import shlex +import sys import tempfile +import time import psutil @@ -69,7 +73,7 @@ class PIDFileTest(AbstractCommandTest): def test_nonexistant(self): with tempfile.TemporaryDirectory(suffix='.pid') as tmpname: tmpdir = Path(tmpname) - pidfile = tmpdir / 'stale.pid' + pidfile = tmpdir / 'nonexistant.pid' self.cmd.worker.trunner.subprocess_pid_file = pidfile @@ -85,10 +89,50 @@ class PIDFileTest(AbstractCommandTest): self.cmd.worker.trunner.subprocess_pid_file = pidfile msg = self.cmd.validate({'cmd': 'echo'}) - self.assertFalse(msg) + self.assertTrue(msg, "Empty PID file should be treated as 'alive'") + self.assertTrue(pidfile.exists(), 'Empty PID file should not have been deleted') def test_not_configured(self): self.cmd.worker.trunner.subprocess_pid_file = None msg = self.cmd.validate({'cmd': 'echo'}) self.assertFalse(msg) + + def test_race_open_exclusive(self): + # When there is a race condition such that the exclusive open() of the + # subprocess PID file fails, the new subprocess should be killed. + + # Use shlex to quote strings like this, so we're sure it's done well. + args = [sys.executable, '-c', 'import time; time.sleep(1)'] + cmd = ' '.join(shlex.quote(s) for s in args) + + with tempfile.TemporaryDirectory() as tmpdir: + pidfile = Path(tmpdir) / 'race.pid' + my_pid = os.getpid() + + # Set up the race condition: at validation time the PID file doesn't exist yet, + # but at execute time it does. + self.cmd.worker.trunner.subprocess_pid_file = pidfile + msg = self.cmd.validate({'cmd': cmd}) + self.assertIsNone(msg) + + # Mock an already-running process by writing our own PID. + pidfile.write_text(str(my_pid)) + + start_time = time.time() + with self.assertRaises(FileExistsError): + self.loop.run_until_complete(asyncio.wait_for( + self.cmd.execute({'cmd': cmd}), + 1.3 # no more than 300 ms longer than the actual sleep + )) + duration = time.time() - start_time + + # This shouldn't take anywhere near the entire sleep time, as that would + # mean the command was executed while there was already another one running. + self.assertLess(duration, 0.8, + "Checking the PID file and killing the process should be fast") + + pid = self.cmd.proc.pid + with self.assertRaises(psutil.NoSuchProcess): + process = psutil.Process(pid) + self.fail(f'Process {process} is still running')