Skip to content
Snippets Groups Projects
Commit c1bc3fc1 authored by Sybren A. Stüvel's avatar Sybren A. Stüvel
Browse files

Write PID file for subprocesses

This prevents multiple subprocess commands running at once. They shouldn't
be doing that in the first place, but we have a strange bug where multiple
Blenders seem to be running on the same machine.
parent cb0aa251
Branches
Tags
No related merge requests found
......@@ -22,6 +22,8 @@ changed functionality, fixed bugs).
checked for writability or readability. Note that write checks are lossy, and bytes are appended
to any existing file used to check writability. When such a check fails, the Worker will go to
status `error` and sleep for 10 minutes before trying again.
- Subprocess commands now write the spawned process PID in a text file, and refuse to run if there
already is such a file with an alive PID.
## Version 2.1.0 (2018-01-04)
......
......@@ -101,7 +101,9 @@ def main():
shutdown_future=shutdown_future,
)
trunner = runner.TaskRunner(
shutdown_future=shutdown_future)
shutdown_future=shutdown_future,
subprocess_pid_file=confparser.value('subprocess_pid_file'),
)
pretask_check_params = parse_pretask_check_config(confparser, log)
......
......@@ -4,6 +4,7 @@ import abc
import asyncio
import asyncio.subprocess
import logging
import pathlib
import re
import time
import typing
......@@ -414,6 +415,45 @@ class AbstractSubprocessCommand(AbstractCommand):
proc = attr.ib(validator=attr.validators.instance_of(asyncio.subprocess.Process),
init=False)
@property
def subprocess_pid_file(self) -> typing.Optional[pathlib.Path]:
subprocess_pid_file = self.worker.trunner.subprocess_pid_file
if not subprocess_pid_file:
return None
return pathlib.Path(subprocess_pid_file)
def validate(self, settings: dict) -> typing.Optional[str]:
supererr = super().validate(settings)
if supererr:
return supererr
pidfile = self.subprocess_pid_file
if pidfile is None:
self._log.warning('No subprocess PID file configured; this is not recommended.')
return None
try:
pid_str = pidfile.read_text()
except FileNotFoundError:
# This is expected, as it means no subprocess is running.
return None
if not pid_str:
pidfile.unlink()
return None
pid = int(pid_str)
self._log.warning('Found PID file %s with PID %r', pidfile, pid)
import psutil
try:
proc = psutil.Process(pid)
except psutil.NoSuchProcess:
self._log.warning('Deleting pidfile %s for stale PID %r', pidfile, pid)
pidfile.unlink()
return None
return 'Subprocess from %s is still running: %s' % (pidfile, proc)
async def subprocess(self, args: list):
import subprocess
import shlex
......@@ -429,6 +469,12 @@ class AbstractSubprocessCommand(AbstractCommand):
stderr=subprocess.STDOUT,
)
pid_path = self.subprocess_pid_file
if pid_path:
# Require exclusive creation to prevent race conditions.
with pid_path.open('x') as pidfile:
pidfile.write(str(self.proc.pid))
try:
while not self.proc.stdout.at_eof():
try:
......@@ -464,6 +510,9 @@ class AbstractSubprocessCommand(AbstractCommand):
self._log.info('asyncio task got canceled, killing subprocess.')
await self.abort()
raise
finally:
if pid_path:
pid_path.unlink()
async def process_line(self, line: str) -> typing.Optional[str]:
"""Processes the line, returning None to ignore it."""
......@@ -526,6 +575,7 @@ class ExecCommand(AbstractSubprocessCommand):
return '"cmd" must be a string'
if not cmd:
return '"cmd" may not be empty'
return super().validate(settings)
async def execute(self, settings: dict):
import shlex
......@@ -602,7 +652,7 @@ class BlenderRenderCommand(AbstractSubprocessCommand):
# Ok, now it's fatal.
return 'filepath %r does not exist' % filepath
return None
return super().validate(settings)
async def execute(self, settings: dict):
cmd = self._build_blender_cmd(settings)
......@@ -764,6 +814,8 @@ class MergeProgressiveRendersCommand(AbstractSubprocessCommand):
_, err = self._setting(settings, 'weight2', True, int)
if err: return err
return super().validate(settings)
async def execute(self, settings: dict):
import tempfile
......
......@@ -17,6 +17,7 @@ DEFAULT_CONFIG = {
('manager_url', ''),
('task_types', 'unknown sleep blender-render'),
('task_update_queue_db', 'flamenco-worker.db'),
('subprocess_pid_file', 'flamenco-worker-subprocess.pid'),
('may_i_run_interval_seconds', '5'),
('worker_id', ''),
('worker_secret', ''),
......
......@@ -13,6 +13,7 @@ class TaskRunner:
"""Runs tasks, sending updates back to the worker."""
shutdown_future = attr.ib(validator=attr.validators.instance_of(asyncio.Future))
subprocess_pid_file = attr.ib(validator=attr.validators.instance_of(str))
last_command_idx = attr.ib(default=0, init=False)
_log = attrs_extra.log('%s.TaskRunner' % __name__)
......
attrs==18.2.0
requests==2.20.1
psutil==5.4.8
import os
from pathlib import Path
import random
import tempfile
import psutil
from test_runner import AbstractCommandTest
class PIDFileTest(AbstractCommandTest):
def setUp(self):
super().setUp()
from flamenco_worker.commands import ExecCommand
self.cmd = ExecCommand(
worker=self.fworker,
task_id='12345',
command_idx=0,
)
def test_alive(self):
with tempfile.NamedTemporaryFile(suffix='.pid') as tmpfile:
pidfile = Path(tmpfile.name)
my_pid = os.getpid()
pidfile.write_text(str(my_pid))
self.cmd.worker.trunner.subprocess_pid_file = pidfile
msg = self.cmd.validate({'cmd': 'echo'})
self.assertIn(str(pidfile), msg)
self.assertIn(str(psutil.Process(my_pid)), msg)
def test_alive_newlines(self):
with tempfile.NamedTemporaryFile(suffix='.pid') as tmpfile:
pidfile = Path(tmpfile.name)
my_pid = os.getpid()
pidfile.write_text('\n%s\n' % my_pid)
self.cmd.worker.trunner.subprocess_pid_file = pidfile
msg = self.cmd.validate({'cmd': 'echo'})
self.assertIn(str(pidfile), msg)
self.assertIn(str(psutil.Process(my_pid)), msg)
def test_dead(self):
# Find a PID that doesn't exist.
for _ in range(1000):
pid = random.randint(1, 2**16)
try:
psutil.Process(pid)
except psutil.NoSuchProcess:
break
else:
self.fail('Unable to find unused PID')
with tempfile.TemporaryDirectory(suffix='.pid') as tmpname:
tmpdir = Path(tmpname)
pidfile = tmpdir / 'stale.pid'
pidfile.write_text(str(pid))
self.cmd.worker.trunner.subprocess_pid_file = pidfile
msg = self.cmd.validate({'cmd': 'echo'})
self.assertFalse(msg)
self.assertFalse(pidfile.exists(), 'Stale PID file should have been deleted')
def test_nonexistant(self):
with tempfile.TemporaryDirectory(suffix='.pid') as tmpname:
tmpdir = Path(tmpname)
pidfile = tmpdir / 'stale.pid'
self.cmd.worker.trunner.subprocess_pid_file = pidfile
msg = self.cmd.validate({'cmd': 'echo'})
self.assertFalse(msg)
def test_empty(self):
with tempfile.TemporaryDirectory(suffix='.pid') as tmpname:
tmpdir = Path(tmpname)
pidfile = tmpdir / 'empty.pid'
pidfile.write_bytes(b'')
self.cmd.worker.trunner.subprocess_pid_file = pidfile
msg = self.cmd.validate({'cmd': 'echo'})
self.assertFalse(msg)
def test_not_configured(self):
self.cmd.worker.trunner.subprocess_pid_file = None
msg = self.cmd.validate({'cmd': 'echo'})
self.assertFalse(msg)
......@@ -8,10 +8,13 @@ class AbstractCommandTest(AbstractWorkerTest):
def setUp(self):
from mock_responses import CoroMock
from flamenco_worker.worker import FlamencoWorker
from flamenco_worker.runner import TaskRunner
from flamenco_worker.cli import construct_asyncio_loop
self.loop = construct_asyncio_loop()
self.fworker = Mock(spec=FlamencoWorker)
self.fworker.trunner = Mock(spec=TaskRunner)
self.fworker.trunner.subprocess_pid_file = None
self.fworker.register_log = CoroMock()
self.fworker.register_task_update = CoroMock()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment