Skip to content
Snippets Groups Projects
Commit ea49ddc2 authored by Sybren A. Stüvel's avatar Sybren A. Stüvel
Browse files

Implemented Manager-dictated state changes.

The Manager can now dictate that the Worker moves to an 'asleep' state.
parent f05d21d1
No related branches found
No related tags found
No related merge requests found
...@@ -6,6 +6,8 @@ changed functionality, fixed bugs). ...@@ -6,6 +6,8 @@ changed functionality, fixed bugs).
## Version 2.0.9 (in development) ## Version 2.0.9 (in development)
- Fixed sending task status updates after the task may no longer be run. - Fixed sending task status updates after the task may no longer be run.
- Worker can now be told to go to sleep by the Manager. In that case task execution
stops (because /may-i-run/{task-id} returns 'no') and new tasks are no longer given.
## Version 2.0.8 (released 2017-09-07) ## Version 2.0.8 (released 2017-09-07)
......
...@@ -17,6 +17,18 @@ class Activity: ...@@ -17,6 +17,18 @@ class Activity:
class MayKeepRunningResponse: class MayKeepRunningResponse:
"""Response from the /may-i-run/{task-id} endpoint""" """Response from the /may-i-run/{task-id} endpoint"""
may_keep_running = attr.ib(validator=attr.validators.instance_of(bool)) may_keep_running = attr.ib(
reason = attr.ib(default=None, validator=attr.validators.instance_of(bool))
reason = attr.ib(
default=None,
validator=attr.validators.optional(attr.validators.instance_of(str))) validator=attr.validators.optional(attr.validators.instance_of(str)))
status_requested = attr.ib(
default=None,
validator=attr.validators.optional(attr.validators.instance_of(str)))
@attr.s
class StatusChangeRequest:
"""Response from the /task endpoint when we're requested to change our status"""
status_requested = attr.ib(validator=attr.validators.instance_of(str))
...@@ -53,5 +53,7 @@ class MayIRun: ...@@ -53,5 +53,7 @@ class MayIRun:
if not may_keep_running.may_keep_running: if not may_keep_running.may_keep_running:
self._log.warning('Not allowed to keep running task %s: %s', self._log.warning('Not allowed to keep running task %s: %s',
task_id, may_keep_running.reason) task_id, may_keep_running.reason)
if may_keep_running.status_requested:
self.worker.change_status(may_keep_running.status_requested)
return may_keep_running.may_keep_running return may_keep_running.may_keep_running
...@@ -20,6 +20,8 @@ PUSH_LOG_MAX_ENTRIES = 10 ...@@ -20,6 +20,8 @@ PUSH_LOG_MAX_ENTRIES = 10
PUSH_LOG_MAX_INTERVAL = datetime.timedelta(seconds=5) PUSH_LOG_MAX_INTERVAL = datetime.timedelta(seconds=5)
PUSH_ACT_MAX_INTERVAL = datetime.timedelta(seconds=1) PUSH_ACT_MAX_INTERVAL = datetime.timedelta(seconds=1)
ASLEEP_POLL_STATUS_CHANGE_REQUESTED_DELAY = 60
class UnableToRegisterError(Exception): class UnableToRegisterError(Exception):
"""Raised when the worker can't register at the manager. """Raised when the worker can't register at the manager.
...@@ -54,6 +56,11 @@ class FlamencoWorker: ...@@ -54,6 +56,11 @@ class FlamencoWorker:
default=None, init=False, default=None, init=False,
validator=attr.validators.optional(attr.validators.instance_of(asyncio.Task))) validator=attr.validators.optional(attr.validators.instance_of(asyncio.Task)))
# See self.sleeping()
sleeping_task = attr.ib(
default=None, init=False,
validator=attr.validators.optional(attr.validators.instance_of(asyncio.Task)))
task_id = attr.ib( task_id = attr.ib(
default=None, init=False, default=None, init=False,
validator=attr.validators.optional(attr.validators.instance_of(str)) validator=attr.validators.optional(attr.validators.instance_of(str))
...@@ -265,17 +272,8 @@ class FlamencoWorker: ...@@ -265,17 +272,8 @@ class FlamencoWorker:
self._log.warning('Shutting down') self._log.warning('Shutting down')
self.failures_are_acceptable = True self.failures_are_acceptable = True
if self.fetch_task_task is not None and not self.fetch_task_task.done(): self.stop_fetching_tasks()
self._log.info('shutdown(): Cancelling task fetching task %s', self.fetch_task_task) self.stop_sleeping()
self.fetch_task_task.cancel()
# This prevents a 'Task was destroyed but it is pending!' warning on the console.
# Sybren: I've only seen this in unit tests, so maybe this code should be moved
# there, instead.
try:
self.loop.run_until_complete(self.fetch_task_task)
except asyncio.CancelledError:
pass
# Stop the task runner # Stop the task runner
self.loop.run_until_complete(self.trunner.abort_current_task()) self.loop.run_until_complete(self.trunner.abort_current_task())
...@@ -301,6 +299,26 @@ class FlamencoWorker: ...@@ -301,6 +299,26 @@ class FlamencoWorker:
self._log.warning('Error signing off. Continuing with shutdown. %s', ex) self._log.warning('Error signing off. Continuing with shutdown. %s', ex)
self.failures_are_acceptable = False self.failures_are_acceptable = False
def stop_fetching_tasks(self):
"""Stops the delayed task-fetching from running.
Used in shutdown and when we're going to status 'asleep'.
"""
if self.fetch_task_task is None or self.fetch_task_task.done():
return
self._log.info('stopping task fetching task %s', self.fetch_task_task)
self.fetch_task_task.cancel()
# This prevents a 'Task was destroyed but it is pending!' warning on the console.
# Sybren: I've only seen this in unit tests, so maybe this code should be moved
# there, instead.
try:
self.loop.run_until_complete(self.fetch_task_task)
except asyncio.CancelledError:
pass
async def fetch_task(self, delay: float): async def fetch_task(self, delay: float):
"""Fetches a single task to perform from Flamenco Manager, and executes it. """Fetches a single task to perform from Flamenco Manager, and executes it.
...@@ -331,6 +349,13 @@ class FlamencoWorker: ...@@ -331,6 +349,13 @@ class FlamencoWorker:
self.schedule_fetch_task(FETCH_TASK_EMPTY_RETRY_DELAY) self.schedule_fetch_task(FETCH_TASK_EMPTY_RETRY_DELAY)
return return
if resp.status_code == 423:
status_change = documents.StatusChangeRequest(**resp.json())
self._log.info('status change to %r requested when fetching new task',
status_change.status_requested)
self.change_status(status_change.status_requested)
return
if resp.status_code != 200: if resp.status_code != 200:
self._log.warning('Error %i fetching new task, will retry in %i seconds.', self._log.warning('Error %i fetching new task, will retry in %i seconds.',
resp.status_code, FETCH_TASK_FAILED_RETRY_DELAY) resp.status_code, FETCH_TASK_FAILED_RETRY_DELAY)
...@@ -532,6 +557,77 @@ class FlamencoWorker: ...@@ -532,6 +557,77 @@ class FlamencoWorker:
self.loop.create_task(do_post()) self.loop.create_task(do_post())
def change_status(self, new_status: str):
"""Called whenever the Flamenco Manager has a change in current status for us."""
self._log.info('Manager requested we go to status %r', new_status)
status_change_handlers = {
'asleep': self.go_to_state_asleep,
'awake': self.go_to_state_awake,
}
try:
handler = status_change_handlers[new_status]
except KeyError:
self._log.error('We have no way to go to status %r, going to sleep instead', new_status)
handler = self.go_to_state_asleep
handler()
# Confirm that we're now in the new state.
try:
post = self.manager.post('/ack-status-change/%s' % new_status, loop=self.loop)
self.loop.create_task(post)
except Exception:
self._log.exception('unable to notify Manager')
def go_to_state_asleep(self):
"""Starts polling for wakeup calls."""
self._log.info('Going to sleep')
self.sleeping_task = self.loop.create_task(self.sleeping())
self._log.debug('Created task %s', self.sleeping_task)
def go_to_state_awake(self):
"""Restarts the task-fetching asyncio task."""
self._log.info('Waking up')
self.stop_sleeping()
self.schedule_fetch_task(3)
def stop_sleeping(self):
"""Stops the asyncio task for sleeping."""
if self.sleeping_task is None or self.sleeping_task.done():
return
self.sleeping_task.cancel()
async def sleeping(self):
"""Regularly polls the Manager to see if we're allowed to wake up again."""
while True:
try:
await asyncio.sleep(ASLEEP_POLL_STATUS_CHANGE_REQUESTED_DELAY)
resp = await self.manager.get('/status-change', loop=self.loop)
if resp.status_code == 204:
# No change, don't do anything
self._log.debug('status the same, continuing sleeping')
elif resp.status_code == 200:
# There is a status change
self._log.debug('/status-change: %s', resp.json())
new_status = resp.json()['status_requested']
self.change_status(new_status)
return
else:
self._log.error(
'Error %d trying to fetch /status-change on Manager, will retry later.',
resp.status_code)
except asyncio.CancelledError:
self._log.info('Sleeping ended')
return
except:
self._log.exception('problems while sleeping')
def generate_secret() -> str: def generate_secret() -> str:
"""Generates a 64-character secret key.""" """Generates a 64-character secret key."""
......
...@@ -22,6 +22,10 @@ class MayIRunTest(AbstractWorkerTest): ...@@ -22,6 +22,10 @@ class MayIRunTest(AbstractWorkerTest):
poll_interval=timedelta(seconds=0.2), poll_interval=timedelta(seconds=0.2),
loop=self.loop) loop=self.loop)
def tearDown(self):
if self.loop:
self.loop.close()
def _mock_get(self, *json_responses): def _mock_get(self, *json_responses):
from collections import deque from collections import deque
...@@ -77,3 +81,13 @@ class MayIRunTest(AbstractWorkerTest): ...@@ -77,3 +81,13 @@ class MayIRunTest(AbstractWorkerTest):
# Cleanly shut down the work task. # Cleanly shut down the work task.
work_task.cancel() work_task.cancel()
self.loop.run_until_complete(work_task) self.loop.run_until_complete(work_task)
def test_go_asleep(self):
self._mock_get(
{'may_keep_running': False, 'reason': 'switching status', 'status_requested': 'Сергей'},
# After this response, no more calls should be made.
)
result = self.loop.run_until_complete(self.mir.may_i_run('1234'))
self.assertFalse(result)
self.worker.change_status.assert_called_with('Сергей')
...@@ -429,3 +429,28 @@ class WorkerShutdownTest(AbstractWorkerTest): ...@@ -429,3 +429,28 @@ class WorkerShutdownTest(AbstractWorkerTest):
def tearDown(self): def tearDown(self):
self.asyncio_loop.close() self.asyncio_loop.close()
class WorkerSleepingTest(AbstractFWorkerTest):
def setUp(self):
super().setUp()
from flamenco_worker.cli import construct_asyncio_loop
self.loop = construct_asyncio_loop()
self.worker.loop = self.loop
def test_stop_current_task_go_sleep(self):
from mock_responses import JsonResponse, CoroMock
self.manager.post = CoroMock()
# response when fetching a task
self.manager.post.coro.return_value = JsonResponse({
'status_requested': 'sleep'
}, status_code=423)
self.worker.schedule_fetch_task()
self.loop.run_until_complete(self.worker.fetch_task_task)
self.assertIsNotNone(self.worker.sleeping_task)
self.assertFalse(self.worker.sleeping_task.done())
self.assertTrue(self.worker.fetch_task_task.done())
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment