diff --git a/CHANGELOG.md b/CHANGELOG.md index e78b043e117e3687f513160642fbfdbe9f5c1457..75389d1aa3878abfe2b44207c9d9bc66ef4403ce 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,20 @@ This file logs the changes that are actually interesting to users (new features, changed functionality, fixed bugs). +## Version 2.2 (in development) + +- Always log the version of Flamenco Worker. +- Include missing merge-exr.blend, required for progressive rendering, in the distribution bundle. +- Include `exr-merge` task type in default configuration, which is required for progressive + rendering. +- Prevent outgoing queue saturation by not fetching a new task when the queue is too large. +- When aborting a subprocess, try to terminate it before killing it. +- Changed some of the configuration defaults to more sensible values (mostly queueing up larger + amounts of logs before pushing to Flamenco Manager). +- Fixed a memory leak in the ask update queue. +- Added a new `log_a_lot` command and task type `debug` to aid in debugging. + + ## Version 2.1.0 (2018-01-04) - Python 3.5.4 is required as minimum Python version. diff --git a/README.md b/README.md index 8e8301a286c0454699ed563ed811c087835c51a7..19cb3e1d8479ce26bd30de40c917a5e8a99f89f3 100644 --- a/README.md +++ b/README.md @@ -126,6 +126,7 @@ Flamenco Worker responds to the following POSIX signals: First run `pip install -r requirements-dev.txt` to fetch developer dependencies. On Windows, download the [Microsoft Visual C++ 2010 Redistributable Package](https://www.microsoft.com/en-us/download/details.aspx?id=13523). +On Ubuntu/Debian, make sure you have the 'binutils' package installed. Run `mkdistfile.py` to create a distributable archive (`.zip` on Windows, `.tar.gz` on Linux and macOS) containing a runnable Flamenco Worker. This build doesn't require installing Python or any diff --git a/flamenco-worker.cfg b/flamenco-worker.cfg index 35aeb2d316899bb396f7352983d653779f0450ed..bacbe88c7b73d085f0d1b9f6df7b1c6460720931 100644 --- a/flamenco-worker.cfg +++ b/flamenco-worker.cfg @@ -1,7 +1,7 @@ [flamenco-worker] # The URL of the Flamenco Manager. Leave empty for auto-discovery via UPnP/SSDP. -manager_url = http://localhost:8083 +manager_url = http://LOCALHOST:5983 task_types = sleep blender-render file-management task_update_queue_db = flamenco-worker.db @@ -10,9 +10,9 @@ may_i_run_interval_seconds = 5 push_log_max_interval_seconds = 20 push_log_max_entries = 200 push_act_max_interval_seconds = 10 -worker_storage_dir = /home/milanjaros/work/temp/public/flamenco/render/storage -worker_output_dir = /home/milanjaros/work/temp/public/flamenco/render/output -worker_blender_cmd = /home/milanjaros/work/temp/public/flamenco/blender/run_icc_mpi.sh +worker_storage_dir = /home/milanjaros/work/temp/public/flamenco/render/in +worker_output_dir = /home/milanjaros/work/temp/public/flamenco/render/out +worker_blender_cmd = ./run_icc_mpi.sh [loggers] keys = root,flamenco_worker diff --git a/flamenco-worker.spec b/flamenco-worker.spec index 3a097704d79d10ac8185b0be94ddb93e27b32157..69ff31ae1ce88fc4e07830631451fe36bee1bf1f 100644 --- a/flamenco-worker.spec +++ b/flamenco-worker.spec @@ -13,7 +13,8 @@ a = Analysis(['flamenco-worker.py'], datas=[('flamenco-worker.cfg', '.'), ('README.md', '.'), ('CHANGELOG.md', '.'), - ('LICENSE.txt', '.')], + ('LICENSE.txt', '.'), + ('flamenco_worker/merge-exr.blend', 'flamenco_worker')], hiddenimports=[], hookspath=[], runtime_hooks=[], diff --git a/flamenco_worker/__init__.py b/flamenco_worker/__init__.py index a33997dd1004d8fb312324a652e27fcab292f4eb..9b882979a61d61100247083788d72b4fe82c3120 100644 --- a/flamenco_worker/__init__.py +++ b/flamenco_worker/__init__.py @@ -1 +1 @@ -__version__ = '2.1.0' +__version__ = '2.2-dev1' diff --git a/flamenco_worker/cli.py b/flamenco_worker/cli.py index 628270afa0803b80445c60c57a9113f68dc70d95..7f56453a05000fdc5fb106ba88c1fa01ba3c421a 100644 --- a/flamenco_worker/cli.py +++ b/flamenco_worker/cli.py @@ -48,6 +48,8 @@ def main(): log = logging.getLogger(__name__) log.debug('Starting, pid=%d', os.getpid()) + log_startup() + if args.test: log.warning('Test mode enabled, overriding task_types=%r', confparser.value('task_types')) @@ -264,5 +266,19 @@ def construct_asyncio_loop() -> asyncio.AbstractEventLoop: return loop +def log_startup(): + """Log the version of Flamenco Worker.""" + + from . import __version__ + + log = logging.getLogger(__name__) + old_level = log.level + try: + log.setLevel(logging.INFO) + log.info('Starting Flamenco Worker %s', __version__) + finally: + log.setLevel(old_level) + + if __name__ == '__main__': main() diff --git a/flamenco_worker/commands.py b/flamenco_worker/commands.py index 03b87f95ce615e04fe9fa12bb9871a94554b2452..d0506302661926937a7c10b6b9a5ce5d5d126506 100644 --- a/flamenco_worker/commands.py +++ b/flamenco_worker/commands.py @@ -202,6 +202,23 @@ class EchoCommand(AbstractCommand): await self.worker.register_log(settings['message']) +@command_executor('log_a_lot') +class LogALotCommand(AbstractCommand): + def validate(self, settings: dict): + lines = settings.get('lines', 20000) + if isinstance(lines, float): + lines = int(lines) + if not isinstance(lines, int): + return '"lines" setting must be an integer, not %s' % type(lines) + + async def execute(self, settings: dict): + lines = settings.get('lines', 20000) + + await self.worker.register_task_update(activity='logging %d lines' % lines) + for idx in range(lines): + await self.worker.register_log(30 * ('This is line %d' % idx)) + + @command_executor('sleep') class SleepCommand(AbstractCommand): def validate(self, settings: dict): @@ -459,11 +476,32 @@ class AbstractSubprocessCommand(AbstractCommand): self._log.debug("No process to kill. That's ok.") return - self._log.info('Aborting subprocess') + self._log.info('Terminating subprocess') + + try: + self.proc.terminate() + except ProcessLookupError: + self._log.debug("The process was already stopped, aborting is impossible. That's ok.") + return + except AttributeError: + # This can happen in some race conditions, it's fine. + self._log.debug("The process was not yet started, aborting is impossible. That's ok.") + return + + timeout = 5 + try: + retval = await asyncio.wait_for(self.proc.wait(), timeout, + loop=asyncio.get_event_loop()) + except asyncio.TimeoutError: + pass + else: + self._log.info('The process aborted with status code %s', retval) + return + + self._log.warning('The process did not stop in %d seconds, going to kill it', timeout) try: self.proc.kill() except ProcessLookupError: - # The process is already stopped, so killing is impossible. That's ok. self._log.debug("The process was already stopped, aborting is impossible. That's ok.") return except AttributeError: diff --git a/flamenco_worker/upstream_update_queue.py b/flamenco_worker/upstream_update_queue.py index 65cca0bf69bd3d2320b13265de84ed532446e7ef..e4d24b66b751e4dde1f5ecd88cf716124a60fb66 100644 --- a/flamenco_worker/upstream_update_queue.py +++ b/flamenco_worker/upstream_update_queue.py @@ -50,7 +50,7 @@ class TaskUpdateQueue: self._db.close() self._db = None - def queue(self, url, payload, *, loop: asyncio.AbstractEventLoop) -> asyncio.Future: + def queue(self, url, payload): """Push some payload onto the queue.""" if self._db is None: @@ -59,15 +59,12 @@ class TaskUpdateQueue: # Store the pickled payload in the SQLite database. pickled = pickle.dumps(payload) - async def do_db_push(): - self._db.execute('INSERT INTO fworker_queue (url, payload) values (?, ?)', - (url, pickled)) - self._db.commit() - - # Notify the work loop that stuff has been queued. - self._stuff_queued.set() + self._db.execute('INSERT INTO fworker_queue (url, payload) values (?, ?)', + (url, pickled)) + self._db.commit() - return asyncio.ensure_future(do_db_push(), loop=loop) + # Notify the work loop that stuff has been queued. + self._stuff_queued.set() async def work(self, *, loop=None): """Loop that pushes queued payloads to the Flamenco Manager. @@ -75,8 +72,6 @@ class TaskUpdateQueue: Keeps running until shutdown_future.done() returns True. """ - import requests - # Always start by inspecting the persisted queue, so act as if something # was just queued. self._stuff_queued.set() @@ -115,6 +110,15 @@ class TaskUpdateQueue: payload = pickle.loads(row[2]) yield rowid, url, payload + def queue_size(self) -> int: + """Return the number of items queued.""" + if self._db is None: + self._connect_db() + + result = self._db.execute('SELECT count(*) FROM fworker_queue') + count = next(result)[0] + return count + def _unqueue(self, rowid: int): """Removes a queued payload from the database.""" @@ -130,25 +134,48 @@ class TaskUpdateQueue: with (await self._queue_lock): queue_is_empty = True + queue_size_before = self.queue_size() + handled = 0 for rowid, url, payload in self._queue(): queue_is_empty = False - self._log.info('Pushing task update to Manager') + queue_size = self.queue_size() + self._log.info('Pushing task update to Manager, queue size is %d', queue_size) resp = await self.manager.post(url, json=payload, loop=loop) if resp.status_code == 409: # The task was assigned to another worker, so we're not allowed to # push updates for it. We have to un-queue this update, as it will # never be accepted. self._log.warning('discarding update, Manager says %s', resp.text) + # TODO(sybren): delete all queued updates to the same URL? else: resp.raise_for_status() self._log.debug('Master accepted pushed update.') self._unqueue(rowid) + handled += 1 + if handled > 1000: + self._log.info('Taking a break from queue flushing after %d items', handled) + break + if queue_is_empty: # Only clear the flag once the queue has really been cleared. self._stuff_queued.clear() + queue_size_after = self.queue_size() + if queue_size_after > 0: + if queue_size_after >= queue_size_before: + self._log.warning( + 'Queue size increased from %d to %d, after having flushed %d items', + queue_size_before, queue_size_after, handled) + else: + self._log.info( + 'Queue size decreased from %d to %d, after having flushed %d items', + queue_size_before, queue_size_after, handled) + + self._log.debug('Vacuuming database') + self._db.execute('VACUUM') + return queue_is_empty async def flush_and_report(self, *, loop: asyncio.AbstractEventLoop): diff --git a/flamenco_worker/worker.py b/flamenco_worker/worker.py index 06e11df8da633c3a75815d3d78cf4edd6f2e665e..bc6e6d3813d345a3d32c5fec17c9aeabf47ffb3b 100644 --- a/flamenco_worker/worker.py +++ b/flamenco_worker/worker.py @@ -17,12 +17,17 @@ FETCH_TASK_FAILED_RETRY_DELAY = 10 # when we failed obtaining a task FETCH_TASK_EMPTY_RETRY_DELAY = 5 # when there are no tasks to perform FETCH_TASK_DONE_SCHEDULE_NEW_DELAY = 3 # after a task is completed -PUSH_LOG_MAX_ENTRIES = 10 -PUSH_LOG_MAX_INTERVAL = datetime.timedelta(seconds=5) -PUSH_ACT_MAX_INTERVAL = datetime.timedelta(seconds=1) +PUSH_LOG_MAX_ENTRIES = 1000 +PUSH_LOG_MAX_INTERVAL = datetime.timedelta(seconds=30) +PUSH_ACT_MAX_INTERVAL = datetime.timedelta(seconds=15) ASLEEP_POLL_STATUS_CHANGE_REQUESTED_DELAY = 30 +# If there are more than this number of queued task updates, we won't ask +# the Manager for another task to execute. Task execution is delayed until +# the queue size is below this threshold. +QUEUE_SIZE_THRESHOLD = 10 + class UnableToRegisterError(Exception): """Raised when the worker can't register at the manager. @@ -366,6 +371,17 @@ class FlamencoWorker: self._log.debug('Going to fetch task in %s seconds', delay) await asyncio.sleep(delay) + # Prevent outgoing queue overflowing by waiting until it's below the + # threshold before starting another task. + # TODO(sybren): introduce another worker state for this, and handle there. + with (await self._queue_lock): + queue_size = self.tuqueue.queue_size() + if queue_size > QUEUE_SIZE_THRESHOLD: + self._log.info('Task Update Queue size too large (%d > %d), waiting until it shrinks.', + queue_size, QUEUE_SIZE_THRESHOLD) + self.schedule_fetch_task(FETCH_TASK_FAILED_RETRY_DELAY) + return + # TODO: use exponential backoff instead of retrying every fixed N seconds. self._log.debug('Fetching task') try: @@ -506,7 +522,7 @@ class FlamencoWorker: self._log.debug('push_to_manager: nothing to push') return - self.tuqueue.queue('/tasks/%s/update' % self.task_id, payload, loop=self.loop) + self.tuqueue.queue('/tasks/%s/update' % self.task_id, payload) async def register_task_update(self, *, task_status: str = None, @@ -560,8 +576,8 @@ class FlamencoWorker: queue_size = len(self._queued_log_entries) if queue_size > self.push_log_max_entries: - self._log.info('Queued up more than %i log entries, pushing to manager', - self.push_log_max_entries) + self._log.info('Queued up %i > %i log entries, pushing to manager', + queue_size, self.push_log_max_entries) await self.push_to_manager() elif datetime.datetime.now() - self.last_log_push > self.push_log_max_interval: self._log.info('More than %s since last log update, pushing to manager', diff --git a/publish-online.sh b/publish-online.sh index 0c0215b622863d0c177b18d17dbc9ebe1fc4853a..8af1d16728dc502890c3dc858ab8e4a61f28cd84 100755 --- a/publish-online.sh +++ b/publish-online.sh @@ -1,6 +1,6 @@ #!/bin/bash -e -FLAMENCO_VERSION="2.1.0" +FLAMENCO_VERSION="2.2-dev1" cd dist diff --git a/requirements-dev.txt b/requirements-dev.txt index bcd9c6e5556ce328bfa2a67dc9e23f4d8dc56fde..4f85618ef25a7e780deedc045f6993217e2a01d0 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -1,3 +1,4 @@ -r requirements-test.txt ipython pyinstaller +wheel diff --git a/setup.cfg b/setup.cfg index bbcddabbbfaf76558135f0e3d16b44c460c21e70..8a274d060a80911bee79825592bba41437583418 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,5 +1,5 @@ [tool:pytest] -addopts = -v --cov flamenco_worker --cov-report term-missing --ignore node_modules -x +addopts = -v --cov flamenco_worker --cov-report term-missing --ignore node_modules [mypy] python_version = 3.5 diff --git a/setup.py b/setup.py index cb59a0cac08acbf4466ca914f43075fda596ad86..c159d32320734227a809754e2aac68845d0f64c2 100755 --- a/setup.py +++ b/setup.py @@ -56,6 +56,7 @@ class ZipCommand(Command): add_to_root(Path('LICENSE.txt')) add_to_root(Path('README.md')) add_to_root(Path('CHANGELOG.md')) + add_to_root(Path('flamenco_worker/merge-exr.blend')) paths = collections.deque([Path('system-integration')]) while paths: @@ -80,11 +81,12 @@ class ZipCommand(Command): with checksum_path.open(mode='w') as shafile: print('%s %s' % (hasher.hexdigest(), zip_name.name), file=shafile) + if __name__ == '__main__': setuptools.setup( cmdclass={'zip': ZipCommand}, name='flamenco-worker', - version='2.1.0', + version='2.2-dev1', description='Flamenco Worker implementation', author='Sybren A. Stüvel', author_email='sybren@blender.studio', diff --git a/tests/test_upstream_update_queue.py b/tests/test_upstream_update_queue.py index 85a4d65b45450e3c5e23f72bae496ed2460d9aa2..8843c0b4bbf05f5c968c43c2474e14e29cf4541d 100644 --- a/tests/test_upstream_update_queue.py +++ b/tests/test_upstream_update_queue.py @@ -76,7 +76,7 @@ class TaskUpdateQueueTest(AbstractWorkerTest): self.manager.post.side_effect = push_callback - self.tuqueue.queue('/push/here', payload, loop=self.asyncio_loop) + self.tuqueue.queue('/push/here', payload) # Run the loop for 2 seconds. This should be enough for 3 retries of 0.3 seconds + handling # the actual payload. @@ -102,8 +102,7 @@ class TaskUpdateQueueTest(AbstractWorkerTest): payload = {'key': 'value', 'sub': {'some': 13, 'values': datetime.datetime.now()}} - self.asyncio_loop.run_until_complete( - self.tuqueue.queue('/push/there', payload, loop=self.asyncio_loop)) + self.tuqueue.queue('/push/there', payload) self.manager.post.assert_not_called() self.tuqueue._disconnect_db() @@ -169,7 +168,7 @@ class TaskUpdateQueueTest(AbstractWorkerTest): self.manager.post.side_effect = push_callback - self.tuqueue.queue('/push/here', payload, loop=self.asyncio_loop) + self.tuqueue.queue('/push/here', payload) # Run the loop for 2 seconds. This should be enough for 3 retries of 0.3 seconds + handling # the actual payload. diff --git a/tests/test_worker.py b/tests/test_worker.py index 2e1542745109288ad9b5e382ad58092db3479ae4..8ff3fa02c12ef89bd01d811dfda8952c3be1c05c 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -28,6 +28,7 @@ class AbstractFWorkerTest(AbstractWorkerTest): self.trunner = Mock(spec=TaskRunner) self.tuqueue = Mock(spec=TaskUpdateQueue) self.tuqueue.flush_and_report = CoroMock() + self.tuqueue.queue_size.return_value = 0 self.trunner.execute = self.mock_task_execute self.trunner.abort_current_task = CoroMock() @@ -210,13 +211,11 @@ class TestWorkerTaskExecution(AbstractFWorkerTest): {'task_progress_percentage': 0, 'activity': '', 'command_progress_percentage': 0, 'task_status': 'active', 'current_command_idx': 0}, - loop=self.loop, ), call('/tasks/58514d1e9837734f2e71b479/update', {'task_progress_percentage': 0, 'activity': 'Task completed', 'command_progress_percentage': 0, 'task_status': 'completed', 'current_command_idx': 0}, - loop=self.loop, ) ]) self.assertEqual(self.tuqueue.queue.call_count, 2) @@ -266,13 +265,12 @@ class TestWorkerTaskExecution(AbstractFWorkerTest): {'task_progress_percentage': 0, 'activity': '', 'command_progress_percentage': 0, 'task_status': 'active', 'current_command_idx': 0}, - loop=self.loop, ) # A bit clunky because we don't know which timestamp is included in the log line. last_args, last_kwargs = self.tuqueue.queue.call_args self.assertEqual(last_args[0], '/tasks/58514d1e9837734f2e71b479/update') - self.assertEqual(last_kwargs, {'loop': self.loop}) + self.assertEqual(last_kwargs, {}) self.assertIn('log', last_args[1]) self.assertTrue(last_args[1]['log'].endswith( 'Worker 1234 stopped running this task, no longer allowed to run by Manager'))