Skip to content
Snippets Groups Projects
cli.py 10.9 KiB
Newer Older
  • Learn to ignore specific revisions
  • """Commandline interface entry points."""
    
    import argparse
    
    import logging
    import logging.config
    
    import typing
    
    
    def main():
        parser = argparse.ArgumentParser()
    
        parser.add_argument('-c', '--config', type=pathlib.Path,
    
                            help='Load this configuration file instead of the default files.')
        parser.add_argument('-v', '--verbose', action='store_true',
    
                            help='Show configuration before starting, '
                                 'and asyncio task status at shutdown.')
    
        parser.add_argument('-V', '--version', action='store_true',
                            help='Show the version of Flamenco Worker and stops.')
    
        parser.add_argument('-r', '--reregister', action='store_true',
                            help="Erases authentication information and re-registers this worker "
                                 "at the Manager. WARNING: this can cause duplicate worker information "
                                 "in the Manager's database.")
    
        parser.add_argument('-d', '--debug', action='store_true',
                            help="Enables debug logging for Flamenco Worker's own log entries. "
                                 "Edit the logging config in flamenco-worker.cfg "
                                 "for more powerful options.")
    
        parser.add_argument('-t', '--test', action='store_true',
                            help="Starts up in testing mode, in which only a handful of "
                                 "test-specific task types are accepted. This overrides the task_types "
                                 "in the configuration file.")
    
        parser.add_argument('-1', '--single', action='store_true',
                            help="Runs a single tasks, then exits.")
    
        args = parser.parse_args()
    
    
        if args.version:
            from . import __version__
            print(__version__)
            raise SystemExit()
    
    
        # Load configuration
    
        confparser = config.load_config(args.config, args.verbose, args.test)
    
        config.configure_logging(confparser, enable_debug=args.debug)
    
    
        log = logging.getLogger(__name__)
    
        log.debug('Starting, pid=%d', os.getpid())
    
        if args.test:
            log.warning('Test mode enabled, overriding task_types=%r',
                        confparser.value('task_types'))
    
    
        if args.reregister:
            log.warning('Erasing worker_id and worker_secret so we can attempt re-registration.')
            confparser.erase('worker_id')
            confparser.erase('worker_secret')
    
    
        if args.single:
            log.info('Running in single-task mode, will stop after performing one task.')
    
    
        # Find the Manager using UPnP/SSDP if we have no manager_url.
        if not confparser.value('manager_url'):
            from . import ssdp_discover
    
            try:
                manager_url = ssdp_discover.find_flamenco_manager()
            except ssdp_discover.DiscoveryFailed:
                log.fatal('Unable to find Flamenco Manager via UPnP/SSDP.')
                raise SystemExit(1)
    
            log.info('Found Flamenco Manager at %s', manager_url)
            confparser.setvalue('manager_url', manager_url)
    
    
        # Patch AsyncIO
        from . import patch_asyncio
        patch_asyncio.patch_asyncio()
    
    
        # Construct the AsyncIO loop
    
        loop = construct_asyncio_loop()
    
        if args.verbose:
            log.debug('Enabling AsyncIO debugging')
            loop.set_debug(True)
        shutdown_future = loop.create_future()
    
        # Piece all the components together.
    
        from . import runner, worker, upstream, upstream_update_queue, may_i_run, __version__
    
    
        fmanager = upstream.FlamencoManager(
    
            manager_url=confparser.value('manager_url'),
    
            flamenco_worker_version=__version__,
    
        tuqueue = upstream_update_queue.TaskUpdateQueue(
    
            db_fname=confparser.value('task_update_queue_db'),
    
            manager=fmanager,
            shutdown_future=shutdown_future,
        )
    
        trunner = runner.TaskRunner(
    
            shutdown_future=shutdown_future,
            subprocess_pid_file=confparser.value('subprocess_pid_file'),
        )
    
        pretask_check_params = parse_pretask_check_config(confparser, log)
    
    
        fworker = worker.FlamencoWorker(
            manager=fmanager,
    
            task_types=confparser.value('task_types').split(),
    
            worker_id=confparser.value('worker_id'),
            worker_secret=confparser.value('worker_secret'),
    
            loop=loop,
            shutdown_future=shutdown_future,
    
            push_log_max_interval=confparser.interval_secs('push_log_max_interval_seconds'),
            push_log_max_entries=confparser.value('push_log_max_entries', int),
            push_act_max_interval=confparser.interval_secs('push_act_max_interval_seconds'),
    
            initial_state='testing' if args.test else 'awake',
    
            pretask_check_params=pretask_check_params,
    
        mir = may_i_run.MayIRun(
            manager=fmanager,
            worker=fworker,
    
            poll_interval=confparser.interval_secs('may_i_run_interval_seconds'),
    
        def shutdown(signum, stackframe):
            """Perform a clean shutdown."""
    
            # Raise an exception, so that the exception is bubbled upwards, until
            # the asyncio loop stops executing the current task. Only then can we
            # run things like loop.run_until_complete(mir_work_task).
            log.warning('Shutting down due to signal %i', signum)
            raise KeyboardInterrupt()
    
    
        def sleep(signum, stackframe):
            log.warning('Going asleep due to signal %i', signum)
            fworker.go_to_state_asleep()
    
        def wakeup(signum, stackframe):
            log.warning('Waking up due to signal %i', signum)
            fworker.go_to_state_awake()
    
    
        # Shut down cleanly upon TERM signal
        import signal
        signal.signal(signal.SIGTERM, shutdown)
        signal.signal(signal.SIGINT, shutdown)
    
    
        if hasattr(signal, 'SIGUSR1'):
    
            # Windows doesn't have USR1/2 signals.
            signal.signal(signal.SIGUSR1, sleep)
            signal.signal(signal.SIGUSR2, wakeup)
    
        if hasattr(signal, 'SIGPOLL'):
            # Not sure how cross-platform SIGPOLL is.
            signal.signal(signal.SIGPOLL, asyncio_report_tasks)
    
        asyncio.ensure_future(tuqueue.work(loop=loop))
    
        mir_work_task = asyncio.ensure_future(mir.work())
    
        def do_clean_shutdown():
    
            shutdown_future.cancel()
    
                loop.run_until_complete(asyncio.wait_for(mir_work_task, 5))
    
            except requests.exceptions.ConnectionError:
                log.warning("Unable to connect to HTTP server, but that's fine as we're shutting down.")
    
            except asyncio.TimeoutError:
                log.debug("Timeout waiting for may-I-run task, "
                          "but that's fine as we're shutting down.")
    
            except KeyboardInterrupt:
                log.info('Keyboard interrupt while shutting down, ignoring as we are shutting down.')
    
            fworker.shutdown()
    
            async def stop_loop():
                log.info('Waiting to give tasks the time to stop gracefully')
    
            loop.run_until_complete(stop_loop())
    
    
        try:
            loop.run_until_complete(fworker.startup())
            fworker.mainloop()
        except worker.UnableToRegisterError:
            # The worker will have logged something, we'll just shut down cleanly.
            pass
        except KeyboardInterrupt:
            do_clean_shutdown()
    
        except:
            log.exception('Uncaught exception!')
    
    
        # Report on the asyncio task status
    
        log.warning('Closing asyncio loop')
    
        log.warning('Flamenco Worker is shut down')
    
    def parse_pretask_check_config(confparser, log):
        """Parse the [pre_task_check] config section.
    
        :rtype: flamenco.worker.PreTaskCheckParams
        """
        from . import worker
    
    
        check_read = []
        check_write = []
    
        for name, value in confparser.items(section='pre_task_check'):
            if name.startswith('write'):
                check_write.append(pathlib.Path(value))
    
    Sybren A. Stüvel's avatar
    Sybren A. Stüvel committed
            elif name.startswith('read'):
    
                check_read.append(pathlib.Path(value))
            else:
                log.fatal('Config section "pre_task_check" should only have keys starting with '
                          '"read" or "write"; found %r', value)
                raise SystemExit(47)
        pretask_check_params = worker.PreTaskCheckParams(
            pre_task_check_write=tuple(check_write),
            pre_task_check_read=tuple(check_read),
        )
        return pretask_check_params
    
    
    
    def asyncio_report_tasks(signum=0, stackframe=None):
        """Runs the garbage collector, then reports all AsyncIO tasks on the log.
    
        Can be used as signal handler.
        """
    
        log = logging.getLogger('%s.asyncio_report_tasks' % __name__)
        log.info('Logging all asyncio tasks.')
    
        all_tasks = asyncio.Task.all_tasks()
        count_done = sum(task.done() for task in all_tasks)
    
        if not len(all_tasks):
            log.info('No scheduled tasks')
        elif len(all_tasks) == count_done:
            log.info('All %i tasks are done.', len(all_tasks))
        else:
            log.info('%i tasks, of which %i are done.', len(all_tasks), count_done)
    
        import gc
        import traceback
    
        # Clean up circular references between tasks.
        gc.collect()
    
        for task_idx, task in enumerate(all_tasks):
            if not task.done():
                log.info('   task #%i: %s', task_idx, task)
                continue
    
            # noinspection PyBroadException
            try:
                res = task.result()
                log.info('   task #%i: %s result=%r', task_idx, task, res)
            except asyncio.CancelledError:
                # No problem, we want to stop anyway.
                log.info('   task #%i: %s cancelled', task_idx, task)
            except Exception:
                log.info('%s: resulted in exception: %s', task, traceback.format_exc())
    
            # for ref in gc.get_referrers(task):
            #     log.info('      - referred by %s', ref)
    
        log.info('Done logging.')
    
    
    
    def construct_asyncio_loop() -> asyncio.AbstractEventLoop:
    
        loop = asyncio.get_event_loop()
        if loop.is_closed():
            loop = asyncio.new_event_loop()
    
    
        # On Windows, the default event loop is SelectorEventLoop which does
        # not support subprocesses. ProactorEventLoop should be used instead.
        # Source: https://docs.python.org/3.5/library/asyncio-subprocess.html
    
        if platform.system() == 'Windows':
    
            # Silly MyPy doesn't understand this only runs on Windows.
            if not isinstance(loop, asyncio.ProactorEventLoop):  # type: ignore
                loop = asyncio.ProactorEventLoop()  # type: ignore
    
        asyncio.set_event_loop(loop)
    
    def log_startup():
        """Log the version of Flamenco Worker."""
    
        from . import __version__
    
        log = logging.getLogger(__name__)
        old_level = log.level
        try:
            log.setLevel(logging.INFO)
            log.info('Starting Flamenco Worker %s', __version__)
        finally:
            log.setLevel(old_level)
    
    
    
    if __name__ == '__main__':
        main()