Skip to content
Snippets Groups Projects
Commit aee83d6d authored by Sybren A. Stüvel's avatar Sybren A. Stüvel
Browse files

Worker: added main loop & task fetching using asyncio loop & scheduler

parent 83676a53
No related branches found
No related tags found
No related merge requests found
import asyncio
import attr import attr
from . import attrs_extra from . import attrs_extra
from . import upstream from . import upstream
# All durations/delays/etc are in seconds.
FETCH_TASK_FAILED_RETRY_DELAY = 10 # when we failed obtaining a task
FETCH_TASK_EMPTY_RETRY_DELAY = 5 # when there are no tasks to perform
@attr.s @attr.s
class FlamencoWorker: class FlamencoWorker:
...@@ -11,10 +17,17 @@ class FlamencoWorker: ...@@ -11,10 +17,17 @@ class FlamencoWorker:
worker_id = attr.ib(validator=attr.validators.instance_of(str)) worker_id = attr.ib(validator=attr.validators.instance_of(str))
worker_secret = attr.ib(validator=attr.validators.instance_of(str)) worker_secret = attr.ib(validator=attr.validators.instance_of(str))
loop = attr.ib(init=False, validator=attr.validators.instance_of(asyncio.AbstractEventLoop))
fetch_task_handle = attr.ib(
default=None,
init=False,
validator=attr.validators.optional(attr.validators.instance_of(asyncio.Task)))
_log = attrs_extra.log('%s.FlamencoWorker' % __name__) _log = attrs_extra.log('%s.FlamencoWorker' % __name__)
def startup(self): def startup(self):
self._log.info('Starting up') self._log.info('Starting up')
self.loop = asyncio.get_event_loop()
if not self.worker_id or not self.worker_secret: if not self.worker_id or not self.worker_secret:
self.register_at_manager() self.register_at_manager()
...@@ -31,6 +44,8 @@ class FlamencoWorker: ...@@ -31,6 +44,8 @@ class FlamencoWorker:
'supported_job_types': self.job_types, 'supported_job_types': self.job_types,
}) })
# TODO: check response code / raise blablabla
result = resp.json() result = resp.json()
self._log.info('Response: %s', result) self._log.info('Response: %s', result)
self.worker_id = result['_id'] self.worker_id = result['_id']
...@@ -50,6 +65,53 @@ class FlamencoWorker: ...@@ -50,6 +65,53 @@ class FlamencoWorker:
def mainloop(self): def mainloop(self):
self._log.info('Entering main loop') self._log.info('Entering main loop')
self.schedule_fetch_task()
self.loop.run_forever()
def schedule_fetch_task(self, delay=0):
"""Schedules a task fetch.
If a task fetch was already queued, that one is cancelled.
:param delay: delay in seconds, after which the task fetch will be performed.
"""
if self.fetch_task_handle:
self.fetch_task_handle.cancel()
self.fetch_task_handle = self.loop.call_later(delay, self._fetch_task)
def _fetch_task(self):
"""Fetches a single task to perform from Flamenco Manager."""
import requests
# TODO: use exponential backoff instead of retrying every fixed N seconds.
self._log.info('Fetching task')
try:
resp = self.manager.post('/task',
auth=(self.worker_id, self.worker_secret))
except requests.exceptions.RequestException as ex:
self._log.warning('Error fetching new task, will retry in %i seconds: %s',
FETCH_TASK_FAILED_RETRY_DELAY, ex)
self.schedule_fetch_task(FETCH_TASK_FAILED_RETRY_DELAY)
return
if resp.status_code == 204:
self._log.info('No tasks available, will retry in %i seconds.',
FETCH_TASK_EMPTY_RETRY_DELAY)
self.schedule_fetch_task(FETCH_TASK_EMPTY_RETRY_DELAY)
return
if resp.status_code != 200:
self._log.warning('Error %i fetching new task, will retry in %i seconds.',
resp.status_code, FETCH_TASK_FAILED_RETRY_DELAY)
self.schedule_fetch_task(FETCH_TASK_FAILED_RETRY_DELAY)
return
task_info = resp.json()
self._log.info('Received task: %s', task_info['_id'])
self._log.debug('Received task: %s', task_info)
def generate_secret() -> str: def generate_secret() -> str:
"""Generates a 64-character secret key.""" """Generates a 64-character secret key."""
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment