Skip to content
Snippets Groups Projects
Commit 8d165c13 authored by Sybren A. Stüvel's avatar Sybren A. Stüvel
Browse files

Worker: respond to SIGUSR1 by logging all current asyncio tasks.

parent c87f9182
Branches
Tags
No related merge requests found
......@@ -67,8 +67,10 @@ Once registered via a POST to the manager's `/register-worker` endpoint, the `wo
## Shutdown
Pressing [CTRL]+[C] will cause a clean shutdown of the worker.
If there is a task currently running, it will be aborted and marked as 'failed'. Any pending
task updates are sent to the Manager before stopping the process.
If there is a task currently running, it will be aborted without changing its status. Any pending task updates are sent to the Manager, and then the Manager's `/sign-off` URL is
POSTed to, to indicate a clean shutdown of the worker. Any active task that is still
assigned to the worker is given status "claimed-by-manager" so that it can be re-activated
by another worker.
## Systemd integration
......@@ -79,3 +81,11 @@ To run Flamenco Worker as a systemd-managed service, copy `flamenco-worker.servi
After installation of this service, `systemctl {start,stop,status,restart} flamenco-worker`
commands can be used to manage it. To ensure that the Flamenco Worker starts at system boot,
use `systemctl enable flamenco-worker`.
## Signals
Flamenco Worker responds to the following POSIX signals:
- `SIGINT`, `SIGTERM`: performs a clean shutdown, as described in the Shutdown section above.
- `SIGUSR1`: logs the currently scheduled asyncio tasks.
......@@ -80,6 +80,7 @@ def main():
import signal
signal.signal(signal.SIGTERM, shutdown)
signal.signal(signal.SIGINT, shutdown)
signal.signal(signal.SIGUSR1, asyncio_report_tasks)
# Start asynchronous tasks.
asyncio.ensure_future(tuqueue.work(loop=loop))
......@@ -109,40 +110,59 @@ def main():
# Report on the asyncio task status
if args.verbose:
all_tasks = asyncio.Task.all_tasks()
if not len(all_tasks):
log.info('no more scheduled tasks, this is a clean shutdown.')
elif all(task.done() for task in all_tasks):
log.info('all %i tasks are done, this is a clean shutdown.', len(all_tasks))
import gc
import traceback
# Clean up circular references between tasks.
gc.collect()
for task_idx, task in enumerate(all_tasks):
if not task.done():
continue
# noinspection PyBroadException
try:
res = task.result()
log.info(' task #%i: %s result=%r', task_idx, task, res)
except asyncio.CancelledError:
# No problem, we want to stop anyway.
log.info(' task #%i: %s cancelled', task_idx, task)
except Exception:
print('{}: resulted in exception'.format(task))
traceback.print_exc()
# for ref in gc.get_referrers(task):
# log.info(' - referred by %s', ref)
asyncio_report_tasks()
log.warning('Closing asyncio loop')
loop.close()
log.warning('Flamenco Worker is shut down')
def asyncio_report_tasks(signum=0, stackframe=None):
"""Runs the garbage collector, then reports all AsyncIO tasks on the log.
Can be used as signal handler.
"""
log = logging.getLogger('%s.asyncio_report_tasks' % __name__)
log.info('Logging all asyncio tasks.')
all_tasks = asyncio.Task.all_tasks()
count_done = sum(task.done() for task in all_tasks)
if not len(all_tasks):
log.info('No scheduled tasks')
elif len(all_tasks) == count_done:
log.info('All %i tasks are done.', len(all_tasks))
else:
log.info('%i tasks, of which %i are done.', len(all_tasks), count_done)
import gc
import traceback
# Clean up circular references between tasks.
gc.collect()
for task_idx, task in enumerate(all_tasks):
if not task.done():
log.info(' task #%i: %s', task_idx, task)
continue
# noinspection PyBroadException
try:
res = task.result()
log.info(' task #%i: %s result=%r', task_idx, task, res)
except asyncio.CancelledError:
# No problem, we want to stop anyway.
log.info(' task #%i: %s cancelled', task_idx, task)
except Exception:
log.info('%s: resulted in exception: %s', task, traceback.format_exc())
# for ref in gc.get_referrers(task):
# log.info(' - referred by %s', ref)
log.info('Done logging.')
def construct_asyncio_loop() -> asyncio.AbstractEventLoop:
# On Windows, the default event loop is SelectorEventLoop which does
# not support subprocesses. ProactorEventLoop should be used instead.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment