Select Git revision
__init__.py
cli.py 8.02 KiB
"""Commandline interface entry points."""
import argparse
import asyncio
import logging
import logging.config
import os
import pathlib
import requests
def main():
parser = argparse.ArgumentParser()
parser.add_argument('-c', '--config', type=pathlib.Path,
help='Load this configuration file instead of the default files.')
parser.add_argument('-v', '--verbose', action='store_true',
help='Show configuration before starting, '
'and asyncio task status at shutdown.')
parser.add_argument('-r', '--reregister', action='store_true',
help="Erases authentication information and re-registers this worker "
"at the Manager. WARNING: this can cause duplicate worker information "
"in the Manager's database.")
parser.add_argument('-d', '--debug', action='store_true',
help="Enables debug logging for Flamenco Worker's own log entries. "
"Edit the logging config in flamenco-worker.cfg "
"for more powerful options.")
args = parser.parse_args()
# Load configuration
from . import config
confparser = config.load_config(args.config, args.verbose)
config.configure_logging(confparser, enable_debug=args.debug)
log = logging.getLogger(__name__)
log.debug('Starting, pid=%d', os.getpid())
if args.reregister:
log.warning('Erasing worker_id and worker_secret so we can attempt re-registration.')
confparser.erase('worker_id')
confparser.erase('worker_secret')
# Find the Manager using UPnP/SSDP if we have no manager_url.
if not confparser.value('manager_url'):
from . import ssdp_discover
try:
manager_url = ssdp_discover.find_flamenco_manager()
except ssdp_discover.DiscoveryFailed:
log.fatal('Unable to find Flamenco Manager via UPnP/SSDP.')
raise SystemExit(1)
log.info('Found Flamenco Manager at %s', manager_url)
confparser.setvalue('manager_url', manager_url)
# Patch AsyncIO
from . import patch_asyncio
patch_asyncio.patch_asyncio()
# Construct the AsyncIO loop
loop = construct_asyncio_loop()
if args.verbose:
log.debug('Enabling AsyncIO debugging')
loop.set_debug(True)
shutdown_future = loop.create_future()
# Piece all the components together.
from . import runner, worker, upstream, upstream_update_queue, may_i_run, __version__
fmanager = upstream.FlamencoManager(
manager_url=confparser.value('manager_url'),
flamenco_worker_version=__version__,
)
tuqueue = upstream_update_queue.TaskUpdateQueue(
db_fname=confparser.value('task_update_queue_db'),
manager=fmanager,
shutdown_future=shutdown_future,
)
trunner = runner.TaskRunner(
shutdown_future=shutdown_future)
fworker = worker.FlamencoWorker(
manager=fmanager,
trunner=trunner,
tuqueue=tuqueue,
task_types=confparser.value('task_types').split(),
worker_id=confparser.value('worker_id'),
worker_secret=confparser.value('worker_secret'),
loop=loop,
shutdown_future=shutdown_future,
push_log_max_interval=confparser.interval_secs('push_log_max_interval_seconds'),
push_log_max_entries=confparser.value('push_log_max_entries', int),
push_act_max_interval=confparser.interval_secs('push_act_max_interval_seconds'),
)
mir = may_i_run.MayIRun(
manager=fmanager,
worker=fworker,
poll_interval=confparser.interval_secs('may_i_run_interval_seconds'),
loop=loop,
)
def shutdown(signum, stackframe):
"""Perform a clean shutdown."""
# Raise an exception, so that the exception is bubbled upwards, until
# the asyncio loop stops executing the current task. Only then can we
# run things like loop.run_until_complete(mir_work_task).
log.warning('Shutting down due to signal %i', signum)
raise KeyboardInterrupt()
def sleep(signum, stackframe):
log.warning('Going asleep due to signal %i', signum)
fworker.go_to_state_asleep()
def wakeup(signum, stackframe):
log.warning('Waking up due to signal %i', signum)
fworker.go_to_state_awake()
# Shut down cleanly upon TERM signal
import signal
signal.signal(signal.SIGTERM, shutdown)
signal.signal(signal.SIGINT, shutdown)
if hasattr(signal, 'SIGUSR1'):
# Windows doesn't have USR1/2 signals.
signal.signal(signal.SIGUSR1, sleep)
signal.signal(signal.SIGUSR2, wakeup)
if hasattr(signal, 'SIGPOLL'):
# Not sure how cross-platform SIGPOLL is.
signal.signal(signal.SIGPOLL, asyncio_report_tasks)
# Start asynchronous tasks.
asyncio.ensure_future(tuqueue.work(loop=loop))
mir_work_task = asyncio.ensure_future(mir.work())
def do_clean_shutdown():
shutdown_future.cancel()
mir_work_task.cancel()
try:
loop.run_until_complete(mir_work_task)
except requests.exceptions.ConnectionError:
log.warning("Unable to connect to HTTP server, but that's fine as we're shutting down.")
fworker.shutdown()
async def stop_loop():
log.info('Waiting to give tasks the time to stop gracefully')
await asyncio.sleep(1)
loop.stop()
loop.run_until_complete(stop_loop())
try:
loop.run_until_complete(fworker.startup())
fworker.mainloop()
except worker.UnableToRegisterError:
# The worker will have logged something, we'll just shut down cleanly.
pass
except KeyboardInterrupt:
do_clean_shutdown()
except:
log.exception('Uncaught exception!')
else:
do_clean_shutdown()
# Report on the asyncio task status
if args.verbose:
asyncio_report_tasks()
log.warning('Closing asyncio loop')
loop.close()
log.warning('Flamenco Worker is shut down')
def asyncio_report_tasks(signum=0, stackframe=None):
"""Runs the garbage collector, then reports all AsyncIO tasks on the log.
Can be used as signal handler.
"""
log = logging.getLogger('%s.asyncio_report_tasks' % __name__)
log.info('Logging all asyncio tasks.')
all_tasks = asyncio.Task.all_tasks()
count_done = sum(task.done() for task in all_tasks)
if not len(all_tasks):
log.info('No scheduled tasks')
elif len(all_tasks) == count_done:
log.info('All %i tasks are done.', len(all_tasks))
else:
log.info('%i tasks, of which %i are done.', len(all_tasks), count_done)
import gc
import traceback
# Clean up circular references between tasks.
gc.collect()
for task_idx, task in enumerate(all_tasks):
if not task.done():
log.info(' task #%i: %s', task_idx, task)
continue
# noinspection PyBroadException
try:
res = task.result()
log.info(' task #%i: %s result=%r', task_idx, task, res)
except asyncio.CancelledError:
# No problem, we want to stop anyway.
log.info(' task #%i: %s cancelled', task_idx, task)
except Exception:
log.info('%s: resulted in exception: %s', task, traceback.format_exc())
# for ref in gc.get_referrers(task):
# log.info(' - referred by %s', ref)
log.info('Done logging.')
def construct_asyncio_loop() -> asyncio.AbstractEventLoop:
# On Windows, the default event loop is SelectorEventLoop which does
# not support subprocesses. ProactorEventLoop should be used instead.
# Source: https://docs.python.org/3.5/library/asyncio-subprocess.html
import sys
if sys.platform == 'win32':
loop = asyncio.ProactorEventLoop()
else:
loop = asyncio.get_event_loop()
if loop.is_closed():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
return loop
if __name__ == '__main__':
main()