Skip to content
Snippets Groups Projects
Select Git revision
  • 54e52f64d76d4b2cf77099a27a4c48a536e6a75b
  • master default protected
  • blender-v3.6-release
  • main
  • blender-v4.1-release
  • blender-v4.0-release
  • blender-v3.3-release
  • asset-shelf
  • blender-v3.5-release
  • brush-assets-project
  • blender-v2.93-release
  • blender-v3.4-release
  • xr-dev
  • bholodeck-v3.3
  • blender-v3.2-release
  • temp-xr-tracker
  • blender-v3.1-release
  • screenshots-manual
  • gltf_vtree
  • blender-v2.83-release
  • blender-v3.0-release
  • v3.6.18
  • v3.6.19
  • v3.6.20
  • v3.6.21
  • v3.6.22
  • v3.6.23
  • v4.1.1
  • v4.1.0
  • v3.6.10
  • v3.6.11
  • v3.6.12
  • v3.6.13
  • v3.6.14
  • v3.6.15
  • v3.6.16
  • v3.6.17
  • v3.6.9
  • v3.3.16
  • v3.6.8
  • v3.3.15
41 results

__init__.py

Blame
  • cli.py 8.02 KiB
    """Commandline interface entry points."""
    
    import argparse
    import asyncio
    import logging
    import logging.config
    import os
    import pathlib
    
    import requests
    
    
    def main():
        parser = argparse.ArgumentParser()
        parser.add_argument('-c', '--config', type=pathlib.Path,
                            help='Load this configuration file instead of the default files.')
        parser.add_argument('-v', '--verbose', action='store_true',
                            help='Show configuration before starting, '
                                 'and asyncio task status at shutdown.')
        parser.add_argument('-r', '--reregister', action='store_true',
                            help="Erases authentication information and re-registers this worker "
                                 "at the Manager. WARNING: this can cause duplicate worker information "
                                 "in the Manager's database.")
        parser.add_argument('-d', '--debug', action='store_true',
                            help="Enables debug logging for Flamenco Worker's own log entries. "
                                 "Edit the logging config in flamenco-worker.cfg "
                                 "for more powerful options.")
        args = parser.parse_args()
    
        # Load configuration
        from . import config
        confparser = config.load_config(args.config, args.verbose)
        config.configure_logging(confparser, enable_debug=args.debug)
    
        log = logging.getLogger(__name__)
        log.debug('Starting, pid=%d', os.getpid())
    
        if args.reregister:
            log.warning('Erasing worker_id and worker_secret so we can attempt re-registration.')
            confparser.erase('worker_id')
            confparser.erase('worker_secret')
    
        # Find the Manager using UPnP/SSDP if we have no manager_url.
        if not confparser.value('manager_url'):
            from . import ssdp_discover
    
            try:
                manager_url = ssdp_discover.find_flamenco_manager()
            except ssdp_discover.DiscoveryFailed:
                log.fatal('Unable to find Flamenco Manager via UPnP/SSDP.')
                raise SystemExit(1)
    
            log.info('Found Flamenco Manager at %s', manager_url)
            confparser.setvalue('manager_url', manager_url)
    
        # Patch AsyncIO
        from . import patch_asyncio
        patch_asyncio.patch_asyncio()
    
        # Construct the AsyncIO loop
        loop = construct_asyncio_loop()
        if args.verbose:
            log.debug('Enabling AsyncIO debugging')
            loop.set_debug(True)
        shutdown_future = loop.create_future()
    
        # Piece all the components together.
        from . import runner, worker, upstream, upstream_update_queue, may_i_run, __version__
    
        fmanager = upstream.FlamencoManager(
            manager_url=confparser.value('manager_url'),
            flamenco_worker_version=__version__,
        )
    
        tuqueue = upstream_update_queue.TaskUpdateQueue(
            db_fname=confparser.value('task_update_queue_db'),
            manager=fmanager,
            shutdown_future=shutdown_future,
        )
        trunner = runner.TaskRunner(
            shutdown_future=shutdown_future)
    
        fworker = worker.FlamencoWorker(
            manager=fmanager,
            trunner=trunner,
            tuqueue=tuqueue,
            task_types=confparser.value('task_types').split(),
            worker_id=confparser.value('worker_id'),
            worker_secret=confparser.value('worker_secret'),
            loop=loop,
            shutdown_future=shutdown_future,
            push_log_max_interval=confparser.interval_secs('push_log_max_interval_seconds'),
            push_log_max_entries=confparser.value('push_log_max_entries', int),
            push_act_max_interval=confparser.interval_secs('push_act_max_interval_seconds'),
        )
    
        mir = may_i_run.MayIRun(
            manager=fmanager,
            worker=fworker,
            poll_interval=confparser.interval_secs('may_i_run_interval_seconds'),
            loop=loop,
        )
    
        def shutdown(signum, stackframe):
            """Perform a clean shutdown."""
    
            # Raise an exception, so that the exception is bubbled upwards, until
            # the asyncio loop stops executing the current task. Only then can we
            # run things like loop.run_until_complete(mir_work_task).
            log.warning('Shutting down due to signal %i', signum)
            raise KeyboardInterrupt()
    
        def sleep(signum, stackframe):
            log.warning('Going asleep due to signal %i', signum)
            fworker.go_to_state_asleep()
    
        def wakeup(signum, stackframe):
            log.warning('Waking up due to signal %i', signum)
            fworker.go_to_state_awake()
    
        # Shut down cleanly upon TERM signal
        import signal
        signal.signal(signal.SIGTERM, shutdown)
        signal.signal(signal.SIGINT, shutdown)
    
        if hasattr(signal, 'SIGUSR1'):
            # Windows doesn't have USR1/2 signals.
            signal.signal(signal.SIGUSR1, sleep)
            signal.signal(signal.SIGUSR2, wakeup)
    
        if hasattr(signal, 'SIGPOLL'):
            # Not sure how cross-platform SIGPOLL is.
            signal.signal(signal.SIGPOLL, asyncio_report_tasks)
    
        # Start asynchronous tasks.
        asyncio.ensure_future(tuqueue.work(loop=loop))
        mir_work_task = asyncio.ensure_future(mir.work())
    
        def do_clean_shutdown():
            shutdown_future.cancel()
            mir_work_task.cancel()
            try:
                loop.run_until_complete(mir_work_task)
            except requests.exceptions.ConnectionError:
                log.warning("Unable to connect to HTTP server, but that's fine as we're shutting down.")
    
            fworker.shutdown()
    
            async def stop_loop():
                log.info('Waiting to give tasks the time to stop gracefully')
                await asyncio.sleep(1)
                loop.stop()
    
            loop.run_until_complete(stop_loop())
    
        try:
            loop.run_until_complete(fworker.startup())
            fworker.mainloop()
        except worker.UnableToRegisterError:
            # The worker will have logged something, we'll just shut down cleanly.
            pass
        except KeyboardInterrupt:
            do_clean_shutdown()
        except:
            log.exception('Uncaught exception!')
        else:
            do_clean_shutdown()
    
        # Report on the asyncio task status
        if args.verbose:
            asyncio_report_tasks()
    
        log.warning('Closing asyncio loop')
        loop.close()
        log.warning('Flamenco Worker is shut down')
    
    
    def asyncio_report_tasks(signum=0, stackframe=None):
        """Runs the garbage collector, then reports all AsyncIO tasks on the log.
    
        Can be used as signal handler.
        """
    
        log = logging.getLogger('%s.asyncio_report_tasks' % __name__)
        log.info('Logging all asyncio tasks.')
    
        all_tasks = asyncio.Task.all_tasks()
        count_done = sum(task.done() for task in all_tasks)
    
        if not len(all_tasks):
            log.info('No scheduled tasks')
        elif len(all_tasks) == count_done:
            log.info('All %i tasks are done.', len(all_tasks))
        else:
            log.info('%i tasks, of which %i are done.', len(all_tasks), count_done)
    
        import gc
        import traceback
    
        # Clean up circular references between tasks.
        gc.collect()
    
        for task_idx, task in enumerate(all_tasks):
            if not task.done():
                log.info('   task #%i: %s', task_idx, task)
                continue
    
            # noinspection PyBroadException
            try:
                res = task.result()
                log.info('   task #%i: %s result=%r', task_idx, task, res)
            except asyncio.CancelledError:
                # No problem, we want to stop anyway.
                log.info('   task #%i: %s cancelled', task_idx, task)
            except Exception:
                log.info('%s: resulted in exception: %s', task, traceback.format_exc())
    
            # for ref in gc.get_referrers(task):
            #     log.info('      - referred by %s', ref)
    
        log.info('Done logging.')
    
    
    def construct_asyncio_loop() -> asyncio.AbstractEventLoop:
        # On Windows, the default event loop is SelectorEventLoop which does
        # not support subprocesses. ProactorEventLoop should be used instead.
        # Source: https://docs.python.org/3.5/library/asyncio-subprocess.html
        import sys
    
        if sys.platform == 'win32':
            loop = asyncio.ProactorEventLoop()
        else:
            loop = asyncio.get_event_loop()
            if loop.is_closed():
                loop = asyncio.new_event_loop()
    
        asyncio.set_event_loop(loop)
        return loop
    
    
    if __name__ == '__main__':
        main()