Skip to content
Snippets Groups Projects
Commit 7c61d3f2 authored by Sybren A. Stüvel's avatar Sybren A. Stüvel
Browse files

Worker: Started working on task status updates

Also introduced nicer shutdown when pressing Ctrl+C
parent b3dccda9
No related branches found
No related tags found
No related merge requests found
"""Commandline interface entry points.""" """Commandline interface entry points."""
import argparse import argparse
import asyncio
import logging import logging
import logging.config import logging.config
...@@ -50,24 +51,48 @@ def main(): ...@@ -50,24 +51,48 @@ def main():
# Load configuration # Load configuration
from . import config from . import config
confparser = config.load_config(args.config, args.verbose) confparser = config.load_config(args.config, args.verbose)
from . import worker, upstream # Construct the AsyncIO loop
loop = asyncio.get_event_loop()
if args.verbose:
log.debug('Enabling AsyncIO debugging')
loop.set_debug(True)
shutdown_future = loop.create_future()
# Piece all the components together.
from . import runner, worker, upstream
fmanager = upstream.FlamencoManager( fmanager = upstream.FlamencoManager(
manager_url=confparser.get(config.CONFIG_SECTION, 'manager_url'), manager_url=confparser.get(config.CONFIG_SECTION, 'manager_url'),
) )
trunner = runner.TaskRunner(
shutdown_future=shutdown_future)
fworker = worker.FlamencoWorker( fworker = worker.FlamencoWorker(
manager=fmanager, manager=fmanager,
trunner=trunner,
job_types=confparser.get(config.CONFIG_SECTION, 'job_types').split(), job_types=confparser.get(config.CONFIG_SECTION, 'job_types').split(),
worker_id=confparser.get(config.CONFIG_SECTION, 'worker_id'), worker_id=confparser.get(config.CONFIG_SECTION, 'worker_id'),
worker_secret=confparser.get(config.CONFIG_SECTION, 'worker_secret'), worker_secret=confparser.get(config.CONFIG_SECTION, 'worker_secret'),
loop=loop,
shutdown_future=shutdown_future,
) )
try: try:
fworker.startup() fworker.startup()
fworker.mainloop() fworker.mainloop()
except KeyboardInterrupt:
log.warning('Shutting down due to keyboard interrupt')
shutdown_future.cancel()
fworker.shutdown()
async def stop_loop():
log.info('Waiting to give tasks the time to stop gracefully')
await asyncio.sleep(2)
loop.stop()
loop.run_until_complete(stop_loop())
except: except:
log.exception('Uncaught exception!') log.exception('Uncaught exception!')
log.warning('Shutting down') log.warning('Shutting down')
......
"""Classes for JSON documents used in upstream communication."""
import attr
@attr.s
class Activity:
"""Activity on a task."""
description = attr.ib(validator=attr.validators.instance_of(str))
current_cmd_name = attr.ib(validator=attr.validators.instance_of(str))
percentage_complete_task = attr.ib(validator=attr.validators.instance_of(int))
percentage_complete_command = attr.ib(validator=attr.validators.instance_of(int))
"""Task runner."""
import abc
import asyncio
import logging
import attr
from . import attrs_extra
from . import worker
command_handlers = {}
def command_executor(cmdname):
"""Class decorator, registers a command executor."""
def decorator(cls):
assert cmdname not in command_handlers
command_handlers[cmdname] = cls
cls.command_name = cmdname
return cls
return decorator
@attr.s
class AbstractCommand(metaclass=abc.ABCMeta):
worker = attr.ib(validator=attr.validators.instance_of(worker.FlamencoWorker))
task_id = attr.ib(validator=attr.validators.instance_of(str))
command_idx = attr.ib(validator=attr.validators.instance_of(int))
# Set by @command_executor
command_name = attr.ib(default=None, init=False, validator=attr.validators.instance_of(str))
# Set by __call__()
identifier = attr.ib(default=None, init=False, validator=attr.validators.instance_of(str))
_log = None
def __call__(self, settings: dict):
"""Runs the command, parsing output and sending it back to the worker."""
self.identifier = '%s(task_id=%s, command_idx=%s)' % (
self.__class__.__name__,
self.task_id,
self.command_idx)
self._log = logging.getLogger('%s.%s' % (__name__, self.identifier))
verr = self.validate(settings)
if verr is not None:
self._log.warning('Invalid settings: %s', verr)
# worker.command_error(self.command_name, verr)
return
try:
self.execute(settings)
except:
self._log.exception('Error executing.')
@abc.abstractmethod
def execute(self, settings: dict):
"""Executes the command."""
def validate(self, settings: dict) -> str:
"""Validates the settings for this command.
If there is an error, a description of the error is returned.
If the settings are valid, None is returned.
By default all settings are considered valid.
"""
return None
def update_activity(self, new_activity):
"""Sends a new activity to the manager."""
raise NotImplementedError()
def upload_log(self, log):
"""Sends a new chunk of logs to the manager."""
raise NotImplementedError()
def handle_output_line(self, line: str):
"""Handles a line of output, parsing it into activity & log."""
raise NotImplementedError()
@command_executor('echo')
class EchoCommand(AbstractCommand):
def execute(self, settings: dict):
raise NotImplementedError()
@attr.s
class TaskRunner:
"""Runs tasks, sending updates back to the worker."""
shutdown_future = attr.ib(validator=attr.validators.instance_of(asyncio.Future))
_log = attrs_extra.log('%s.TaskRunner' % __name__)
async def execute(self, task: dict, fworker: worker.FlamencoWorker):
raise NotImplementedError('Task execution not implemented yet.')
...@@ -13,12 +13,16 @@ FETCH_TASK_EMPTY_RETRY_DELAY = 5 # when there are no tasks to perform ...@@ -13,12 +13,16 @@ FETCH_TASK_EMPTY_RETRY_DELAY = 5 # when there are no tasks to perform
@attr.s @attr.s
class FlamencoWorker: class FlamencoWorker:
manager = attr.ib(validator=attr.validators.instance_of(upstream.FlamencoManager)) manager = attr.ib(validator=attr.validators.instance_of(upstream.FlamencoManager))
trunner = attr.ib() # Instance of flamenco_worker.runner.TaskRunner
job_types = attr.ib(validator=attr.validators.instance_of(list)) job_types = attr.ib(validator=attr.validators.instance_of(list))
worker_id = attr.ib(validator=attr.validators.instance_of(str)) worker_id = attr.ib(validator=attr.validators.instance_of(str))
worker_secret = attr.ib(validator=attr.validators.instance_of(str)) worker_secret = attr.ib(validator=attr.validators.instance_of(str))
loop = attr.ib(init=False, validator=attr.validators.instance_of(asyncio.AbstractEventLoop)) loop = attr.ib(validator=attr.validators.instance_of(asyncio.AbstractEventLoop))
fetch_task_handle = attr.ib( shutdown_future = attr.ib(
validator=attr.validators.optional(attr.validators.instance_of(asyncio.Future)))
fetch_task_task = attr.ib(
default=None, default=None,
init=False, init=False,
validator=attr.validators.optional(attr.validators.instance_of(asyncio.Task))) validator=attr.validators.optional(attr.validators.instance_of(asyncio.Task)))
...@@ -27,7 +31,6 @@ class FlamencoWorker: ...@@ -27,7 +31,6 @@ class FlamencoWorker:
def startup(self): def startup(self):
self._log.info('Starting up') self._log.info('Starting up')
self.loop = asyncio.get_event_loop()
if not self.worker_id or not self.worker_secret: if not self.worker_id or not self.worker_secret:
self.register_at_manager() self.register_at_manager()
...@@ -65,6 +68,8 @@ class FlamencoWorker: ...@@ -65,6 +68,8 @@ class FlamencoWorker:
def mainloop(self): def mainloop(self):
self._log.info('Entering main loop') self._log.info('Entering main loop')
# TODO: add "watchdog" task that checks the asyncio loop and ensures there is
# always either a task being executed or a task fetch scheduled.
self.schedule_fetch_task() self.schedule_fetch_task()
self.loop.run_forever() self.loop.run_forever()
...@@ -76,15 +81,29 @@ class FlamencoWorker: ...@@ -76,15 +81,29 @@ class FlamencoWorker:
:param delay: delay in seconds, after which the task fetch will be performed. :param delay: delay in seconds, after which the task fetch will be performed.
""" """
if self.fetch_task_handle: if self.fetch_task_task:
self.fetch_task_handle.cancel() self.fetch_task_task.cancel()
self.fetch_task_handle = self.loop.call_later(delay, self._fetch_task)
self.fetch_task_task = asyncio.ensure_future(self.fetch_task(delay), loop=self.loop)
def shutdown(self):
"""Gracefully shuts down any asynchronous tasks."""
if self.fetch_task_task and not self.fetch_task_task.done():
self._log.info('Cancelling task fetching task %s', self.fetch_task_task)
self.fetch_task_task.cancel()
def _fetch_task(self): async def fetch_task(self, delay: float):
"""Fetches a single task to perform from Flamenco Manager.""" """Fetches a single task to perform from Flamenco Manager.
:param delay: waits this many seconds before fetching a task.
"""
import requests import requests
self._log.debug('Going to fetch task in %s seconds', delay)
await asyncio.sleep(delay)
# TODO: use exponential backoff instead of retrying every fixed N seconds. # TODO: use exponential backoff instead of retrying every fixed N seconds.
self._log.info('Fetching task') self._log.info('Fetching task')
try: try:
...@@ -109,9 +128,45 @@ class FlamencoWorker: ...@@ -109,9 +128,45 @@ class FlamencoWorker:
return return
task_info = resp.json() task_info = resp.json()
self._log.info('Received task: %s', task_info['_id']) task_id = task_info['_id']
self._log.info('Received task: %s', task_id)
self._log.debug('Received task: %s', task_info) self._log.debug('Received task: %s', task_info)
try:
await self.trunner.execute(task_info, self)
except Exception as ex:
self._log.exception('Uncaught exception executing task %s' % task_id)
self.send_task_update(
task_id,
'failed',
'Uncaught exception: %s' % ex
)
finally:
# Always schedule a new task run.
self.schedule_fetch_task(0)
def send_task_update(self, task_id, new_activity_descr: str = None,
task_status: str = None):
"""Updates a task's status and activity description."""
import requests
self._log.info('Updating task %s with new status %r and activity %r',
task_id, task_status, new_activity_descr)
payload = {'activity_descr': new_activity_descr}
if task_status:
payload['task_status'] = task_status
resp = self.manager.post('/tasks/%s/update' % task_id,
json=payload,
auth=(self.worker_id, self.worker_secret))
self._log.debug('Sent task %s update to manager', task_id)
try:
resp.raise_for_status()
except requests.HTTPError as ex:
self._log.error('Unable to send status update to manager, update is lost: %s', ex)
def generate_secret() -> str: def generate_secret() -> str:
"""Generates a 64-character secret key.""" """Generates a 64-character secret key."""
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment