Newer
Older
Sybren A. Stüvel
committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
"""Polls the /may-i-run/{task-id} endpoint on the Manager."""
import asyncio
import datetime
import attr
from . import attrs_extra
from . import documents
from . import worker
from . import upstream
@attr.s
class MayIRun:
manager = attr.ib(validator=attr.validators.instance_of(upstream.FlamencoManager),
repr=False)
worker = attr.ib(validator=attr.validators.instance_of(worker.FlamencoWorker),
repr=False)
poll_interval = attr.ib(validator=attr.validators.instance_of(datetime.timedelta))
loop = attr.ib(validator=attr.validators.instance_of(asyncio.AbstractEventLoop))
_log = attrs_extra.log('%s.MayIRun' % __name__)
async def work(self):
try:
while True:
await self.one_iteration()
await asyncio.sleep(self.poll_interval.total_seconds())
except asyncio.CancelledError:
self._log.warning('Shutting down.')
async def one_iteration(self):
task_id = self.worker.active_task_id
if not task_id:
# self._log.debug('No current task')
return
if await self.may_i_run(task_id):
self._log.debug('Current task %s may run', task_id)
Sybren A. Stüvel
committed
return
self._log.warning('We have to stop task %s', task_id)
await self.worker.stop_current_task(task_id)
Sybren A. Stüvel
committed
async def may_i_run(self, task_id: str) -> bool:
"""Asks the Manager whether we are still allowed to run the given task."""
resp = await self.manager.get('/may-i-run/%s' % task_id, loop=self.loop)
may_keep_running = documents.MayKeepRunningResponse(**resp.json())
if not may_keep_running.may_keep_running:
self._log.warning('Not allowed to keep running task %s: %s',
task_id, may_keep_running.reason)
if may_keep_running.status_requested:
self.worker.change_status(may_keep_running.status_requested)
Sybren A. Stüvel
committed
return may_keep_running.may_keep_running