diff --git a/packages/flamenco-worker-python/README.md b/packages/flamenco-worker-python/README.md index 6ff2351ba130785f030910d9cf9ba5ad411b7678..63fd37a90f4b9a44526b2521e9cd4e0f8c14203c 100644 --- a/packages/flamenco-worker-python/README.md +++ b/packages/flamenco-worker-python/README.md @@ -67,8 +67,10 @@ Once registered via a POST to the manager's `/register-worker` endpoint, the `wo ## Shutdown Pressing [CTRL]+[C] will cause a clean shutdown of the worker. -If there is a task currently running, it will be aborted and marked as 'failed'. Any pending -task updates are sent to the Manager before stopping the process. +If there is a task currently running, it will be aborted without changing its status. Any pending task updates are sent to the Manager, and then the Manager's `/sign-off` URL is +POSTed to, to indicate a clean shutdown of the worker. Any active task that is still +assigned to the worker is given status "claimed-by-manager" so that it can be re-activated +by another worker. ## Systemd integration @@ -79,3 +81,11 @@ To run Flamenco Worker as a systemd-managed service, copy `flamenco-worker.servi After installation of this service, `systemctl {start,stop,status,restart} flamenco-worker` commands can be used to manage it. To ensure that the Flamenco Worker starts at system boot, use `systemctl enable flamenco-worker`. + + +## Signals + +Flamenco Worker responds to the following POSIX signals: + +- `SIGINT`, `SIGTERM`: performs a clean shutdown, as described in the Shutdown section above. +- `SIGUSR1`: logs the currently scheduled asyncio tasks. diff --git a/packages/flamenco-worker-python/flamenco_worker/cli.py b/packages/flamenco-worker-python/flamenco_worker/cli.py index 657db01dd5cfed0b56980862004a258c64cf4cc5..46544be0dfe4361cf722f643cb195f5001d58434 100644 --- a/packages/flamenco-worker-python/flamenco_worker/cli.py +++ b/packages/flamenco-worker-python/flamenco_worker/cli.py @@ -80,6 +80,7 @@ def main(): import signal signal.signal(signal.SIGTERM, shutdown) signal.signal(signal.SIGINT, shutdown) + signal.signal(signal.SIGUSR1, asyncio_report_tasks) # Start asynchronous tasks. asyncio.ensure_future(tuqueue.work(loop=loop)) @@ -109,40 +110,59 @@ def main(): # Report on the asyncio task status if args.verbose: - all_tasks = asyncio.Task.all_tasks() - if not len(all_tasks): - log.info('no more scheduled tasks, this is a clean shutdown.') - elif all(task.done() for task in all_tasks): - log.info('all %i tasks are done, this is a clean shutdown.', len(all_tasks)) - - import gc - import traceback - - # Clean up circular references between tasks. - gc.collect() - - for task_idx, task in enumerate(all_tasks): - if not task.done(): - continue - - # noinspection PyBroadException - try: - res = task.result() - log.info(' task #%i: %s result=%r', task_idx, task, res) - except asyncio.CancelledError: - # No problem, we want to stop anyway. - log.info(' task #%i: %s cancelled', task_idx, task) - except Exception: - print('{}: resulted in exception'.format(task)) - traceback.print_exc() - # for ref in gc.get_referrers(task): - # log.info(' - referred by %s', ref) + asyncio_report_tasks() log.warning('Closing asyncio loop') loop.close() log.warning('Flamenco Worker is shut down') +def asyncio_report_tasks(signum=0, stackframe=None): + """Runs the garbage collector, then reports all AsyncIO tasks on the log. + + Can be used as signal handler. + """ + + log = logging.getLogger('%s.asyncio_report_tasks' % __name__) + log.info('Logging all asyncio tasks.') + + all_tasks = asyncio.Task.all_tasks() + count_done = sum(task.done() for task in all_tasks) + + if not len(all_tasks): + log.info('No scheduled tasks') + elif len(all_tasks) == count_done: + log.info('All %i tasks are done.', len(all_tasks)) + else: + log.info('%i tasks, of which %i are done.', len(all_tasks), count_done) + + import gc + import traceback + + # Clean up circular references between tasks. + gc.collect() + + for task_idx, task in enumerate(all_tasks): + if not task.done(): + log.info(' task #%i: %s', task_idx, task) + continue + + # noinspection PyBroadException + try: + res = task.result() + log.info(' task #%i: %s result=%r', task_idx, task, res) + except asyncio.CancelledError: + # No problem, we want to stop anyway. + log.info(' task #%i: %s cancelled', task_idx, task) + except Exception: + log.info('%s: resulted in exception: %s', task, traceback.format_exc()) + + # for ref in gc.get_referrers(task): + # log.info(' - referred by %s', ref) + + log.info('Done logging.') + + def construct_asyncio_loop() -> asyncio.AbstractEventLoop: # On Windows, the default event loop is SelectorEventLoop which does # not support subprocesses. ProactorEventLoop should be used instead.