Skip to content
Snippets Groups Projects
Commit 3322cdab authored by Sybren A. Stüvel's avatar Sybren A. Stüvel
Browse files

Worker: echo & sleep are working and sending proper status & log to Master

parent bd7fc0d7
No related branches found
No related tags found
No related merge requests found
...@@ -34,40 +34,44 @@ class AbstractCommand(metaclass=abc.ABCMeta): ...@@ -34,40 +34,44 @@ class AbstractCommand(metaclass=abc.ABCMeta):
""" """
self.identifier = '%s(task_id=%s, command_idx=%s)' % ( self.identifier = '%s(task_id=%s, command_idx=%s)' % (
self.__class__.__name__, self.command_name,
self.task_id, self.task_id,
self.command_idx) self.command_idx)
self._log = logging.getLogger('%s.%s' % (__name__, self.identifier)) self._log = logging.getLogger('%s.%s' % (__name__, self.identifier))
verr = self.validate(settings) verr = self.validate(settings)
if verr is not None: if verr is not None:
self._log.warning('Invalid settings: %s', verr) self.register_log('%s: Error in settings: %s' % (self.identifier, verr))
await self.register_error('%s: Error in settings: %s' % (self.identifier, verr)) self.worker.register_task_update(
task_status='failed',
activity='%s: Invalid settings: %s' % (self.identifier, verr),
)
return False return False
await self.register_log('%s: Starting' % self.identifier) self.worker.register_log('%s: Starting' % self.command_name)
await self.update_task(documents.Activity( self.worker.register_task_update(
activity='starting %s' % self.identifier, activity='starting %s' % self.command_name,
current_command_idx=self.command_idx, current_command_idx=self.command_idx,
task_progress_percentage=-1, command_progress_percentage=0
command_progress_percentage=100 )
))
try: try:
await self.execute(settings) await self.execute(settings)
await self.register_error('%s: Completed' % self.identifier)
except Exception as ex: except Exception as ex:
self._log.exception('Error executing.') self._log.exception('Error executing.')
await self.register_error('%s: Error executing: %s' % (self.identifier, ex)) self.worker.register_log('%s: Error executing: %s' % (self.identifier, ex))
self.worker.register_task_update(
task_status='failed',
activity='%s: Error executing: %s' % (self.identifier, ex),
)
return False return False
await self.register_log('%s: Finished' % self.identifier) self.worker.register_log('%s: Finished' % self.command_name)
await self.update_task(documents.Activity( self.worker.register_task_update(
activity='finished %s' % self.identifier, activity='finished %s' % self.command_name,
current_command_idx=self.command_idx, current_command_idx=self.command_idx,
task_progress_percentage=-1,
command_progress_percentage=100 command_progress_percentage=100
)) )
return True return True
...@@ -86,20 +90,6 @@ class AbstractCommand(metaclass=abc.ABCMeta): ...@@ -86,20 +90,6 @@ class AbstractCommand(metaclass=abc.ABCMeta):
return None return None
async def register_log(self, log):
"""Sends a new log to the worker."""
raise NotImplementedError()
async def handle_output_line(self, line: str):
"""Handles a line of output, parsing it into activity & log."""
raise NotImplementedError()
async def register_error(self, log_line: str):
# TODO: implement this
self._log.debug('TODO: send this to worker: %r', log_line)
def command_executor(cmdname): def command_executor(cmdname):
"""Class decorator, registers a command executor.""" """Class decorator, registers a command executor."""
...@@ -114,21 +104,6 @@ def command_executor(cmdname): ...@@ -114,21 +104,6 @@ def command_executor(cmdname):
return decorator return decorator
@command_executor('echo')
class EchoCommand(AbstractCommand):
def validate(self, settings: dict):
try:
msg = settings['message']
except KeyError:
return 'Missing "message"'
if not isinstance(msg, str):
return 'Message must be a string'
async def execute(self, settings: dict):
self.register_log(settings['message'])
@attr.s @attr.s
class TaskRunner: class TaskRunner:
"""Runs tasks, sending updates back to the worker.""" """Runs tasks, sending updates back to the worker."""
...@@ -178,3 +153,36 @@ class TaskRunner: ...@@ -178,3 +153,36 @@ class TaskRunner:
self._log.info('Task %s completed succesfully.', task_id) self._log.info('Task %s completed succesfully.', task_id)
return True return True
@command_executor('echo')
class EchoCommand(AbstractCommand):
def validate(self, settings: dict):
try:
msg = settings['message']
except KeyError:
return 'Missing "message"'
if not isinstance(msg, str):
return 'Message must be a string'
async def execute(self, settings: dict):
self.worker.register_log(settings['message'])
@command_executor('sleep')
class SleepCommand(AbstractCommand):
def validate(self, settings: dict):
try:
sleeptime = settings['time_in_seconds']
except KeyError:
return 'Missing "time_in_seconds"'
if not isinstance(sleeptime, (int, float)):
return 'time_in_seconds must be an int or float'
async def execute(self, settings: dict):
time_in_seconds = settings['time_in_seconds']
self.worker.register_log('Sleeping for %s seconds' % time_in_seconds)
await asyncio.sleep(time_in_seconds)
self.worker.register_log('Done sleeping for %s seconds' % time_in_seconds)
...@@ -157,7 +157,12 @@ class FlamencoWorker: ...@@ -157,7 +157,12 @@ class FlamencoWorker:
self._log.debug('Received task: %s', task_info) self._log.debug('Received task: %s', task_info)
try: try:
await self.trunner.execute(task_info, self) self.register_task_update(task_status='active')
ok = await self.trunner.execute(task_info, self)
if ok:
self.register_task_update(task_status='completed')
else:
self.register_task_update(task_status='failed')
except Exception as ex: except Exception as ex:
self._log.exception('Uncaught exception executing task %s' % self.task_id) self._log.exception('Uncaught exception executing task %s' % self.task_id)
try: try:
...@@ -177,6 +182,8 @@ class FlamencoWorker: ...@@ -177,6 +182,8 @@ class FlamencoWorker:
"""Updates a task's status and activity. """Updates a task's status and activity.
""" """
# TODO Sybren: do this in a separate thread, as to not block the task runner.
import requests import requests
self._log.info('Updating task %s with status %r and activity %r', self._log.info('Updating task %s with status %r and activity %r',
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment