Newer
Older
"""Commandline interface entry points."""
import argparse
import logging
import logging.config
import requests
def create_pidfile(path: pathlib.Path):
"""Creates a PID file in the given location.
Als uses the atexit module to remove the PID file once the process is terminated.
"""
import atexit
import os
def remove_pidfile():
if path.exists():
path.unlink()
atexit.register(remove_pidfile)
log = logging.getLogger(__name__)
pid = os.getpid()
if path.exists():
log.info('Overwriting PID file %s with our PID %i', path, pid)
else:
log.info('Creating PID file %s with our PID %i', path, pid)
path.write_text(str(pid), encoding='ascii')
def main():
parser = argparse.ArgumentParser()
parser.add_argument('-c', '--config',
help='Load this configuration file instead of the default files.')
parser.add_argument('-v', '--verbose', action='store_true',
help='Show configuration before starting, '
'and asyncio task status at shutdown.')
args = parser.parse_args()
# Load configuration
from . import config
confparser = config.load_config(args.config, args.verbose)
config.configure_logging(confparser)
log = logging.getLogger(__name__)
log.debug('Starting')
# Create the PID file, if the location is configured.
pidfile = confparser.value('pid')
if pidfile:
create_pidfile(pathlib.Path(pidfile))
else:
log.info("Not creating PID file because the configuration has no value for the 'pid' key.")
# Patch AsyncIO
from . import patch_asyncio
patch_asyncio.patch_asyncio()
# Construct the AsyncIO loop
loop = construct_asyncio_loop()
if args.verbose:
log.debug('Enabling AsyncIO debugging')
loop.set_debug(True)
shutdown_future = loop.create_future()
# Piece all the components together.
from . import runner, worker, upstream, upstream_update_queue, may_i_run, __version__
fmanager = upstream.FlamencoManager(
manager_url=confparser.value('manager_url'),
flamenco_worker_version=__version__,
tuqueue = upstream_update_queue.TaskUpdateQueue(
db_fname=confparser.value('task_update_queue_db'),
manager=fmanager,
shutdown_future=shutdown_future,
)
trunner = runner.TaskRunner(
shutdown_future=shutdown_future)
fworker = worker.FlamencoWorker(
manager=fmanager,
tuqueue=tuqueue,
task_types=confparser.value('task_types').split(),
worker_id=confparser.value('worker_id'),
worker_secret=confparser.value('worker_secret'),
loop=loop,
shutdown_future=shutdown_future,
push_log_max_interval=confparser.interval_secs('push_log_max_interval_seconds'),
push_log_max_entries=confparser.value('push_log_max_entries', int),
push_act_max_interval=confparser.interval_secs('push_act_max_interval_seconds'),
Sybren A. Stüvel
committed
mir = may_i_run.MayIRun(
manager=fmanager,
worker=fworker,
poll_interval=confparser.interval_secs('may_i_run_interval_seconds'),
Sybren A. Stüvel
committed
loop=loop,
)
def shutdown(signum, stackframe):
"""Perform a clean shutdown."""
# Raise an exception, so that the exception is bubbled upwards, until
# the asyncio loop stops executing the current task. Only then can we
# run things like loop.run_until_complete(mir_work_task).
log.warning('Shutting down due to signal %i', signum)
raise KeyboardInterrupt()
# Shut down cleanly upon TERM signal
import signal
signal.signal(signal.SIGTERM, shutdown)
signal.signal(signal.SIGINT, shutdown)
signal.signal(signal.SIGUSR1, asyncio_report_tasks)
Sybren A. Stüvel
committed
# Start asynchronous tasks.
asyncio.ensure_future(tuqueue.work(loop=loop))
Sybren A. Stüvel
committed
mir_work_task = asyncio.ensure_future(mir.work())
Sybren A. Stüvel
committed
loop.run_until_complete(fworker.startup())
except worker.UnableToRegisterError:
# The worker will have logged something, we'll just shut down cleanly.
pass
except KeyboardInterrupt:
shutdown_future.cancel()
Sybren A. Stüvel
committed
mir_work_task.cancel()
try:
loop.run_until_complete(mir_work_task)
except requests.exceptions.ConnectionError:
log.warning("Unable to connect to HTTP server, but that's fine as we're shutting down.")
Sybren A. Stüvel
committed
fworker.shutdown()
async def stop_loop():
log.info('Waiting to give tasks the time to stop gracefully')
await asyncio.sleep(1)
loop.run_until_complete(stop_loop())
except:
log.exception('Uncaught exception!')
# Report on the asyncio task status
asyncio_report_tasks()
log.warning('Closing asyncio loop')
log.warning('Flamenco Worker is shut down')
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
def asyncio_report_tasks(signum=0, stackframe=None):
"""Runs the garbage collector, then reports all AsyncIO tasks on the log.
Can be used as signal handler.
"""
log = logging.getLogger('%s.asyncio_report_tasks' % __name__)
log.info('Logging all asyncio tasks.')
all_tasks = asyncio.Task.all_tasks()
count_done = sum(task.done() for task in all_tasks)
if not len(all_tasks):
log.info('No scheduled tasks')
elif len(all_tasks) == count_done:
log.info('All %i tasks are done.', len(all_tasks))
else:
log.info('%i tasks, of which %i are done.', len(all_tasks), count_done)
import gc
import traceback
# Clean up circular references between tasks.
gc.collect()
for task_idx, task in enumerate(all_tasks):
if not task.done():
log.info(' task #%i: %s', task_idx, task)
continue
# noinspection PyBroadException
try:
res = task.result()
log.info(' task #%i: %s result=%r', task_idx, task, res)
except asyncio.CancelledError:
# No problem, we want to stop anyway.
log.info(' task #%i: %s cancelled', task_idx, task)
except Exception:
log.info('%s: resulted in exception: %s', task, traceback.format_exc())
# for ref in gc.get_referrers(task):
# log.info(' - referred by %s', ref)
log.info('Done logging.')
def construct_asyncio_loop() -> asyncio.AbstractEventLoop:
# On Windows, the default event loop is SelectorEventLoop which does
# not support subprocesses. ProactorEventLoop should be used instead.
# Source: https://docs.python.org/3.5/library/asyncio-subprocess.html
import sys
if sys.platform == 'win32':
loop = asyncio.ProactorEventLoop()
else:
loop = asyncio.get_event_loop()
if loop.is_closed():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
if __name__ == '__main__':
main()