diff --git a/CHANGELOG.md b/CHANGELOG.md index d99aeb0449890d839c3cc048a0e09c2b91de02dc..e78bb0c9f011a828a6e44545baa92e1ea3b4ca29 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,8 @@ changed functionality, fixed bugs). ## Version 2.0.9 (in development) - Fixed sending task status updates after the task may no longer be run. +- Worker can now be told to go to sleep by the Manager. In that case task execution + stops (because /may-i-run/{task-id} returns 'no') and new tasks are no longer given. ## Version 2.0.8 (released 2017-09-07) diff --git a/flamenco_worker/documents.py b/flamenco_worker/documents.py index 366af42f2ce3d9cc4d718812a2d2385044a7becb..48482f443e9a26eb13367a6155ecc2ba16289c3d 100644 --- a/flamenco_worker/documents.py +++ b/flamenco_worker/documents.py @@ -17,6 +17,18 @@ class Activity: class MayKeepRunningResponse: """Response from the /may-i-run/{task-id} endpoint""" - may_keep_running = attr.ib(validator=attr.validators.instance_of(bool)) - reason = attr.ib(default=None, - validator=attr.validators.optional(attr.validators.instance_of(str))) + may_keep_running = attr.ib( + validator=attr.validators.instance_of(bool)) + reason = attr.ib( + default=None, + validator=attr.validators.optional(attr.validators.instance_of(str))) + status_requested = attr.ib( + default=None, + validator=attr.validators.optional(attr.validators.instance_of(str))) + + +@attr.s +class StatusChangeRequest: + """Response from the /task endpoint when we're requested to change our status""" + + status_requested = attr.ib(validator=attr.validators.instance_of(str)) diff --git a/flamenco_worker/may_i_run.py b/flamenco_worker/may_i_run.py index 288e38a3420d237fd1e72df60f30745d41a5baf6..5eec5ffd2d096779157cd3182ed642f4d89ccb27 100644 --- a/flamenco_worker/may_i_run.py +++ b/flamenco_worker/may_i_run.py @@ -53,5 +53,7 @@ class MayIRun: if not may_keep_running.may_keep_running: self._log.warning('Not allowed to keep running task %s: %s', task_id, may_keep_running.reason) + if may_keep_running.status_requested: + self.worker.change_status(may_keep_running.status_requested) return may_keep_running.may_keep_running diff --git a/flamenco_worker/worker.py b/flamenco_worker/worker.py index 0b41cd497b1a510786515e63089355b9a77d2413..b590f4a77d1b9360c2abcae5c9fbd10956dd4f34 100644 --- a/flamenco_worker/worker.py +++ b/flamenco_worker/worker.py @@ -20,6 +20,8 @@ PUSH_LOG_MAX_ENTRIES = 10 PUSH_LOG_MAX_INTERVAL = datetime.timedelta(seconds=5) PUSH_ACT_MAX_INTERVAL = datetime.timedelta(seconds=1) +ASLEEP_POLL_STATUS_CHANGE_REQUESTED_DELAY = 60 + class UnableToRegisterError(Exception): """Raised when the worker can't register at the manager. @@ -54,6 +56,11 @@ class FlamencoWorker: default=None, init=False, validator=attr.validators.optional(attr.validators.instance_of(asyncio.Task))) + # See self.sleeping() + sleeping_task = attr.ib( + default=None, init=False, + validator=attr.validators.optional(attr.validators.instance_of(asyncio.Task))) + task_id = attr.ib( default=None, init=False, validator=attr.validators.optional(attr.validators.instance_of(str)) @@ -265,17 +272,8 @@ class FlamencoWorker: self._log.warning('Shutting down') self.failures_are_acceptable = True - if self.fetch_task_task is not None and not self.fetch_task_task.done(): - self._log.info('shutdown(): Cancelling task fetching task %s', self.fetch_task_task) - self.fetch_task_task.cancel() - - # This prevents a 'Task was destroyed but it is pending!' warning on the console. - # Sybren: I've only seen this in unit tests, so maybe this code should be moved - # there, instead. - try: - self.loop.run_until_complete(self.fetch_task_task) - except asyncio.CancelledError: - pass + self.stop_fetching_tasks() + self.stop_sleeping() # Stop the task runner self.loop.run_until_complete(self.trunner.abort_current_task()) @@ -301,6 +299,26 @@ class FlamencoWorker: self._log.warning('Error signing off. Continuing with shutdown. %s', ex) self.failures_are_acceptable = False + def stop_fetching_tasks(self): + """Stops the delayed task-fetching from running. + + Used in shutdown and when we're going to status 'asleep'. + """ + + if self.fetch_task_task is None or self.fetch_task_task.done(): + return + + self._log.info('stopping task fetching task %s', self.fetch_task_task) + self.fetch_task_task.cancel() + + # This prevents a 'Task was destroyed but it is pending!' warning on the console. + # Sybren: I've only seen this in unit tests, so maybe this code should be moved + # there, instead. + try: + self.loop.run_until_complete(self.fetch_task_task) + except asyncio.CancelledError: + pass + async def fetch_task(self, delay: float): """Fetches a single task to perform from Flamenco Manager, and executes it. @@ -331,6 +349,13 @@ class FlamencoWorker: self.schedule_fetch_task(FETCH_TASK_EMPTY_RETRY_DELAY) return + if resp.status_code == 423: + status_change = documents.StatusChangeRequest(**resp.json()) + self._log.info('status change to %r requested when fetching new task', + status_change.status_requested) + self.change_status(status_change.status_requested) + return + if resp.status_code != 200: self._log.warning('Error %i fetching new task, will retry in %i seconds.', resp.status_code, FETCH_TASK_FAILED_RETRY_DELAY) @@ -532,6 +557,77 @@ class FlamencoWorker: self.loop.create_task(do_post()) + def change_status(self, new_status: str): + """Called whenever the Flamenco Manager has a change in current status for us.""" + + self._log.info('Manager requested we go to status %r', new_status) + status_change_handlers = { + 'asleep': self.go_to_state_asleep, + 'awake': self.go_to_state_awake, + } + + try: + handler = status_change_handlers[new_status] + except KeyError: + self._log.error('We have no way to go to status %r, going to sleep instead', new_status) + handler = self.go_to_state_asleep + + handler() + + # Confirm that we're now in the new state. + try: + post = self.manager.post('/ack-status-change/%s' % new_status, loop=self.loop) + self.loop.create_task(post) + except Exception: + self._log.exception('unable to notify Manager') + + def go_to_state_asleep(self): + """Starts polling for wakeup calls.""" + + self._log.info('Going to sleep') + self.sleeping_task = self.loop.create_task(self.sleeping()) + self._log.debug('Created task %s', self.sleeping_task) + + def go_to_state_awake(self): + """Restarts the task-fetching asyncio task.""" + + self._log.info('Waking up') + self.stop_sleeping() + self.schedule_fetch_task(3) + + def stop_sleeping(self): + """Stops the asyncio task for sleeping.""" + if self.sleeping_task is None or self.sleeping_task.done(): + return + self.sleeping_task.cancel() + + async def sleeping(self): + """Regularly polls the Manager to see if we're allowed to wake up again.""" + + while True: + try: + await asyncio.sleep(ASLEEP_POLL_STATUS_CHANGE_REQUESTED_DELAY) + resp = await self.manager.get('/status-change', loop=self.loop) + + if resp.status_code == 204: + # No change, don't do anything + self._log.debug('status the same, continuing sleeping') + elif resp.status_code == 200: + # There is a status change + self._log.debug('/status-change: %s', resp.json()) + new_status = resp.json()['status_requested'] + self.change_status(new_status) + return + else: + self._log.error( + 'Error %d trying to fetch /status-change on Manager, will retry later.', + resp.status_code) + except asyncio.CancelledError: + self._log.info('Sleeping ended') + return + except: + self._log.exception('problems while sleeping') + def generate_secret() -> str: """Generates a 64-character secret key.""" diff --git a/tests/test_may_i_run.py b/tests/test_may_i_run.py index ce6e7bc155c42f78b6cf595589da2911c1480795..90e23b8bdde853676300076c302295d2a13decff 100644 --- a/tests/test_may_i_run.py +++ b/tests/test_may_i_run.py @@ -22,6 +22,10 @@ class MayIRunTest(AbstractWorkerTest): poll_interval=timedelta(seconds=0.2), loop=self.loop) + def tearDown(self): + if self.loop: + self.loop.close() + def _mock_get(self, *json_responses): from collections import deque @@ -77,3 +81,13 @@ class MayIRunTest(AbstractWorkerTest): # Cleanly shut down the work task. work_task.cancel() self.loop.run_until_complete(work_task) + + def test_go_asleep(self): + self._mock_get( + {'may_keep_running': False, 'reason': 'switching status', 'status_requested': 'Сергей'}, + # After this response, no more calls should be made. + ) + + result = self.loop.run_until_complete(self.mir.may_i_run('1234')) + self.assertFalse(result) + self.worker.change_status.assert_called_with('Сергей') diff --git a/tests/test_worker.py b/tests/test_worker.py index e2016262a23458ded6200a18468bd62cb032dabc..f0cc6eac52124fa4fee3f9df5381d1fa7c47afe7 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -429,3 +429,28 @@ class WorkerShutdownTest(AbstractWorkerTest): def tearDown(self): self.asyncio_loop.close() + + +class WorkerSleepingTest(AbstractFWorkerTest): + def setUp(self): + super().setUp() + from flamenco_worker.cli import construct_asyncio_loop + + self.loop = construct_asyncio_loop() + self.worker.loop = self.loop + + def test_stop_current_task_go_sleep(self): + from mock_responses import JsonResponse, CoroMock + + self.manager.post = CoroMock() + # response when fetching a task + self.manager.post.coro.return_value = JsonResponse({ + 'status_requested': 'sleep' + }, status_code=423) + + self.worker.schedule_fetch_task() + self.loop.run_until_complete(self.worker.fetch_task_task) + + self.assertIsNotNone(self.worker.sleeping_task) + self.assertFalse(self.worker.sleeping_task.done()) + self.assertTrue(self.worker.fetch_task_task.done())