diff --git a/packages/flamenco/flamenco/__init__.py b/packages/flamenco/flamenco/__init__.py index 97c8b322385a441dc3e712fae1ea48271d259169..70a65c65cbb23d913bc8c9669167f3d85dd76b03 100644 --- a/packages/flamenco/flamenco/__init__.py +++ b/packages/flamenco/flamenco/__init__.py @@ -65,7 +65,6 @@ class FlamencoExtension(PillarExtension): from . import routes import flamenco.jobs.routes import flamenco.tasks.routes - import flamenco.scheduler.routes return [ routes.blueprint, @@ -73,7 +72,6 @@ class FlamencoExtension(PillarExtension): flamenco.jobs.routes.blueprint, flamenco.tasks.routes.perjob_blueprint, flamenco.tasks.routes.perproject_blueprint, - flamenco.scheduler.routes.blueprint, ] @property @@ -100,9 +98,6 @@ class FlamencoExtension(PillarExtension): jobs.setup_app(app) tasks.setup_app(app) - # Imports for side-effects - from . import scheduler - def _create_collections(self, db): import pymongo diff --git a/packages/flamenco/flamenco/jobs/__init__.py b/packages/flamenco/flamenco/jobs/__init__.py index 1ee0611b9ffbedc27974777967a3b49e32128d8e..040081cdf401bf474cf7ab50ac786fd8254e1af1 100644 --- a/packages/flamenco/flamenco/jobs/__init__.py +++ b/packages/flamenco/flamenco/jobs/__init__.py @@ -135,9 +135,20 @@ class JobManager(object): """Updates the job status based on the status of this task and other tasks in the job. """ - if new_task_status == {'queued', 'cancel-requested'}: + def __job_active_if_queued(): + """Set job to active if it was queued.""" + + jobs_coll = current_flamenco.db('jobs') + job = jobs_coll.find_one(job_id, projection={'status': 1}) + if job['status'] == 'queued': + self._log.info('Job %s became active because one of its tasks %s changed ' + 'status to %s', job_id, task_id, new_task_status) + self.api_set_job_status(job_id, 'active') + + if new_task_status == {'queued', 'cancel-requested', 'claimed-by-manager'}: # Ignore; for now re-queueing a task doesn't change the job status. # Also, canceling a single task has no influence on the job itself. + # A task being claimed by the manager also doesn't change job status. return if new_task_status == 'canceled': @@ -164,9 +175,10 @@ class JobManager(object): self._log.warning('Task %s of job %s failed; ' 'only %i of its %i tasks failed (%i%%), so ignoring for now', task_id, job_id, fail_count, total_count, fail_perc) + __job_active_if_queued() return - if new_task_status in {'claimed-by-manager', 'active', 'processing'}: + if new_task_status in {'active', 'processing'}: self._log.info('Job %s became active because one of its tasks %s changed status to %s', job_id, task_id, new_task_status) self.api_set_job_status(job_id, 'active') @@ -181,6 +193,8 @@ class JobManager(object): 'setting job to completed.', task_id, job_id) self.api_set_job_status(job_id, 'completed') + else: + __job_active_if_queued() return self._log.warning('Task %s of job %s obtained status %s, ' diff --git a/packages/flamenco/flamenco/scheduler/__init__.py b/packages/flamenco/flamenco/scheduler/__init__.py deleted file mode 100644 index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..0000000000000000000000000000000000000000 diff --git a/packages/flamenco/flamenco/scheduler/routes.py b/packages/flamenco/flamenco/scheduler/routes.py deleted file mode 100644 index daca4e29cb7c2abb3207895f1139c30c498a3a5e..0000000000000000000000000000000000000000 --- a/packages/flamenco/flamenco/scheduler/routes.py +++ /dev/null @@ -1,83 +0,0 @@ -import logging -import flask - -from pillar.api.utils import authorization - -log = logging.getLogger(__name__) -blueprint = flask.Blueprint('flamenco.scheduler', __name__, url_prefix='/scheduler') - -# The scheduler is in charge of -# - generating a task (or task list) for each manager request -# - update the task list according to external changes - -CLAIMED_STATUS = 'claimed-by-manager' -CLAIMED_ACTIVITY = 'Claimed by manager' - - -@blueprint.route('/tasks/<manager_id>') -@authorization.require_login(require_roles={u'service', u'flamenco_manager'}, require_all=True) -def schedule_tasks(manager_id): - """Upon request from a manager, picks the first task available and returns - it in JSON format. - Read from request args: - - job_type (e.g. simple_blender_render) - - worker (optional, the worker name) - - Validate request (is it a manager?) - - Get manager document - - Query by manager and job_type (sort by priority and creation date) - - TODO: allow multiple tasks to be requested - - Optionally handle the 'worker' arg - - This is an API endpoint, so we interface directly with the database. - """ - - from flamenco import current_flamenco - from pillar.api.utils import jsonify, str2id - - manager_id = str2id(manager_id) - chunk_size = int(flask.request.args.get('chunk_size', 1)) - job_type = flask.request.args.get('job_type') - - log.debug('Handing over max %i tasks to manager %s', chunk_size, manager_id) - - # TODO: properly order tasks based on parents' status etc. - tasks_coll = current_flamenco.db('tasks') - query = { - 'status': 'queued', - 'manager': manager_id, - } - if job_type: - query['job_type'] = job_type - - tasks = [] - affected_jobs = set() - for task in tasks_coll.find(query).sort("priority", -1): - task['status'] = CLAIMED_STATUS - tasks.append(task) - affected_jobs.add(task['job']) - - if len(tasks) >= chunk_size: - break - - if not tasks: - # Nothing to hand out. - return jsonify([]) - - # Do an update directly via MongoDB and not via Eve. Doing it via Eve - # requires permissions to do a GET on the task, which we don't want - # to allow to managers (to force them to use the scheduler). - tasks_coll.update_many( - {'_id': {'$in': [task['_id'] for task in tasks]}}, - {'$set': {'status': CLAIMED_STATUS, - 'activity': CLAIMED_ACTIVITY}} - ) - - log.info('Handing over %i tasks to manager %s', len(tasks), manager_id) - - # Update the affected jobs. - for job_id in affected_jobs: - current_flamenco.job_manager.update_job_after_task_status_change(job_id, 'unknown', - CLAIMED_STATUS) - - resp = jsonify(tasks) - return resp diff --git a/packages/flamenco/tests/abstract_flamenco_test.py b/packages/flamenco/tests/abstract_flamenco_test.py index 9ef0ba0c6641211b5df36e0fc6a7d32fd3e29cfa..e9c07d015855087d2569ad4921062e374c2655b7 100644 --- a/packages/flamenco/tests/abstract_flamenco_test.py +++ b/packages/flamenco/tests/abstract_flamenco_test.py @@ -71,8 +71,7 @@ class AbstractFlamencoTest(AbstractPillarTest): def assert_job_status(self, expected_status): with self.app.test_request_context(): jobs_coll = self.flamenco.db('jobs') - job = jobs_coll.find_one({'_id': self.job_id}, - projection={'status': 1}) + job = jobs_coll.find_one(self.job_id, projection={'status': 1}) self.assertEqual(job['status'], unicode(expected_status)) def set_job_status(self, new_status, job_id=None): diff --git a/packages/flamenco/tests/test_depsgraph.py b/packages/flamenco/tests/test_depsgraph.py index 909ad44090254bf8ff653c92299e261d1bee1002..c4b5ee3e486eb76975a4fc201d3f7fb914919988 100644 --- a/packages/flamenco/tests/test_depsgraph.py +++ b/packages/flamenco/tests/test_depsgraph.py @@ -126,6 +126,7 @@ class DepsgraphTest(AbstractFlamencoTest): [task['status'] for task in depsgraph]) def test_get_subsequent_call(self): + import time from dateutil.parser import parse # Get a clean slate first, so that we get the timestamp of last modification @@ -140,6 +141,7 @@ class DepsgraphTest(AbstractFlamencoTest): expected_status=304) # Change some tasks to see what we get back. + time.sleep(0.05) # sleep a bit to stabilise the test. self.force_task_status(0, 'claimed-by-manager') self.force_task_status(1, 'cancel-requested') self.force_task_status(2, 'queued') diff --git a/packages/flamenco/tests/test_scheduler.py b/packages/flamenco/tests/test_scheduler.py deleted file mode 100644 index 7f37c234e619b398b71d1f0f6a7fb27021ece656..0000000000000000000000000000000000000000 --- a/packages/flamenco/tests/test_scheduler.py +++ /dev/null @@ -1,106 +0,0 @@ -# -*- encoding: utf-8 -*- -from __future__ import absolute_import - -from bson import ObjectId - -from pillar.tests import common_test_data as ctd -from abstract_flamenco_test import AbstractFlamencoTest - - -class TaskSchedulerTest(AbstractFlamencoTest): - def setUp(self, **kwargs): - AbstractFlamencoTest.setUp(self, **kwargs) - - from pillar.api.utils.authentication import force_cli_user - - mngr_doc, account, token = self.create_manager_service_account() - self.mngr_id = mngr_doc['_id'] - self.mngr_token = token['token'] - - with self.app.test_request_context(): - force_cli_user() - self.jmngr.api_create_job( - 'test job', - u'Wörk wørk w°rk.', - 'sleep', - { - 'frames': '12-18, 20-22', - 'chunk_size': 3, - 'time_in_seconds': 3, - }, - self.proj_id, - ctd.EXAMPLE_PROJECT_OWNER_ID, - self.mngr_id, - ) - - def _assert_sleep_task(self, expected_name, expected_status, task): - self.assertEqual(expected_name, task['name']) - self.assertEqual('sleep', task['job_type']) - self.assertEqual(expected_status, task['status']) - self.assertEqual(str(self.mngr_id), str(task['manager'])) - self.assertEqual([ - {'name': 'echo', 'settings': {'message': 'Preparing to sleep'}}, - {'name': 'sleep', 'settings': {'time_in_seconds': 3}}, - ], task['commands']) - - def test_default_chunked(self): - from flamenco import current_flamenco - - chunk = self.get('/flamenco/scheduler/tasks/%s' % self.mngr_id, - auth_token=self.mngr_token).json() - - self.assertEqual(1, len(chunk)) - task = chunk[0] - self._assert_sleep_task('sleep-12-14', 'claimed-by-manager', task) - - # Check that the status in the database changed too. - with self.app.test_request_context(): - tasks_coll = current_flamenco.db('tasks') - task = tasks_coll.find_one({'_id': ObjectId(task['_id'])}) - self._assert_sleep_task('sleep-12-14', 'claimed-by-manager', task) - - def test_chunked(self): - chunk = self.get('/flamenco/scheduler/tasks/%s?chunk_size=2' % self.mngr_id, - auth_token=self.mngr_token).json() - - self.assertEqual(2, len(chunk)) - self._assert_sleep_task('sleep-12-14', 'claimed-by-manager', chunk[0]) - self._assert_sleep_task('sleep-15-17', 'claimed-by-manager', chunk[1]) - - # Check that the last task hasn't been touched yet. - with self.app.test_request_context(): - tasks_coll = self.flamenco.db('tasks') - task = tasks_coll.find_one({'name': 'sleep-18,20,21'}) - self._assert_sleep_task('sleep-18,20,21', 'queued', task) - - def test_by_priority(self): - from pillar.api.utils.authentication import force_cli_user - - with self.app.test_request_context(): - force_cli_user() - - high_prio_job = self.jmngr.api_create_job( - 'test job high prio', - u'Wörk wørk w°rk.', - 'sleep', - { - 'frames': '12-18, 20-22', - 'chunk_size': 3, - 'time_in_seconds': 3, - }, - self.proj_id, - ctd.EXAMPLE_PROJECT_OWNER_ID, - self.mngr_id, - priority=100, - ) - - high_prio_jobid = high_prio_job['_id'] - - # Without proper sorting, this will return the first task, i.e. the one of the - # medium-priority job created in setUp(). - chunk = self.get('/flamenco/scheduler/tasks/%s?chunk_size=1' % self.mngr_id, - auth_token=self.mngr_token).json() - self.assertEqual(unicode(high_prio_jobid), chunk[0]['job']) - - # The task should be initialised to the job's priority. - self.assertEqual(100, chunk[0]['priority']) diff --git a/packages/flamenco/tests/test_task_patch.py b/packages/flamenco/tests/test_task_patch.py index dae3e7b12d52d261ee5daf6a7e7392c6c1dc1f5f..bfd7c26284223d16cd739c9da496e82b1d843df2 100644 --- a/packages/flamenco/tests/test_task_patch.py +++ b/packages/flamenco/tests/test_task_patch.py @@ -38,8 +38,8 @@ class TaskPatchingTest(AbstractFlamencoTest): self.job_id = job['_id'] def test_set_task_invalid_status(self): - chunk = self.get('/flamenco/scheduler/tasks/%s' % self.mngr_id, - auth_token=self.mngr_token).json() + chunk = self.get('/api/flamenco/managers/%s/depsgraph' % self.mngr_id, + auth_token=self.mngr_token).json()['depsgraph'] task = chunk[0] task_url = '/api/flamenco/tasks/%s' % task['_id'] @@ -59,8 +59,8 @@ class TaskPatchingTest(AbstractFlamencoTest): self.assertEqual('claimed-by-manager', task['status']) def test_set_task_valid_status(self): - chunk = self.get('/flamenco/scheduler/tasks/%s' % self.mngr_id, - auth_token=self.mngr_token).json() + chunk = self.get('/api/flamenco/managers/%s/depsgraph' % self.mngr_id, + auth_token=self.mngr_token).json()['depsgraph'] task = chunk[0] task_url = '/api/flamenco/tasks/%s' % task['_id'] @@ -86,9 +86,8 @@ class TaskPatchingTest(AbstractFlamencoTest): # The test job consists of 4 tasks; get their IDs through the scheduler. # This should set the job status to active. - tasks = self.get('/flamenco/scheduler/tasks/%s?chunk_size=1000' % self.mngr_id, - auth_token=self.mngr_token).json() - self.assert_job_status('active') + tasks = self.get('/api/flamenco/managers/%s/depsgraph' % self.mngr_id, + auth_token=self.mngr_token).json()['depsgraph'] self.assertEqual(4, len(tasks)) # After setting tasks 1-3 to 'completed' the job should still not be completed. diff --git a/packages/flamenco/tests/test_task_update_batch.py b/packages/flamenco/tests/test_task_update_batch.py index 3873b724ddbbf187d608c61bc97bd65c1eed1ad0..a8996289459f2817045f07a7aa3f4ea2e717ae1b 100644 --- a/packages/flamenco/tests/test_task_update_batch.py +++ b/packages/flamenco/tests/test_task_update_batch.py @@ -19,10 +19,11 @@ class AbstractTaskBatchUpdateTest(AbstractFlamencoTest): def do_schedule_tasks(self): # The test job consists of 4 tasks; get their IDs through the scheduler. - # This should set the job status to active, and the task status to claimed-by-manager. - tasks = self.get('/flamenco/scheduler/tasks/%s?chunk_size=1000' % self.mngr_id, - auth_token=self.mngr_token).json() - self.assert_job_status('active') + # This should set the task status to claimed-by-manager. + tasks = self.get('/api/flamenco/managers/%s/depsgraph' % self.mngr_id, + auth_token=self.mngr_token).json()['depsgraph'] + # TODO: maybe claimed-by-manager? + # self.assert_job_status('active') self.assertEqual(self.TASK_COUNT, len(tasks)) return tasks @@ -72,8 +73,8 @@ class TaskBatchUpdateTest(AbstractTaskBatchUpdateTest): self.job_id = job['_id'] def test_set_task_invalid_status(self): - chunk = self.get('/flamenco/scheduler/tasks/%s' % self.mngr_id, - auth_token=self.mngr_token).json() + chunk = self.get('/api/flamenco/managers/%s/depsgraph' % self.mngr_id, + auth_token=self.mngr_token).json()['depsgraph'] task = chunk[0] # A warning should be logged and the status should be rejected. @@ -96,8 +97,8 @@ class TaskBatchUpdateTest(AbstractTaskBatchUpdateTest): def test_illegal_active_after_cancel_requested(self): from flamenco import current_flamenco - chunk = self.get('/flamenco/scheduler/tasks/%s' % self.mngr_id, - auth_token=self.mngr_token).json() + chunk = self.get('/api/flamenco/managers/%s/depsgraph' % self.mngr_id, + auth_token=self.mngr_token).json()['depsgraph'] task = chunk[0] # Request task cancellation after it was received by the manager. @@ -124,8 +125,8 @@ class TaskBatchUpdateTest(AbstractTaskBatchUpdateTest): def test_canceled_after_cancel_requested(self): from flamenco import current_flamenco - chunk = self.get('/flamenco/scheduler/tasks/%s' % self.mngr_id, - auth_token=self.mngr_token).json() + chunk = self.get('/api/flamenco/managers/%s/depsgraph' % self.mngr_id, + auth_token=self.mngr_token).json()['depsgraph'] task = chunk[0] # Request task cancellation after it was received by the manager. @@ -178,6 +179,19 @@ class TaskBatchUpdateTest(AbstractTaskBatchUpdateTest): expect_cancel_task_ids={tasks[0]['_id'], tasks[2]['_id'], tasks[3]['_id']}) self.assert_job_status('failed') + def test_job_status_active_after_task_update(self): + """A job should go to active when its tasks are being updated. + """ + + self.force_job_status('queued') + tasks = self.do_schedule_tasks() + + # Any of these statuses should set the job to active. + for status in (u'active', u'completed'): + self.force_job_status('queued') + self.do_batch_update(tasks, [0], [status]) + self.assert_job_status('active') + def test_job_status_canceled_due_to_task_update(self): """When the last cancel-requested task goes to canceled, a cancel-requested job should too. """ @@ -213,7 +227,7 @@ class TaskBatchUpdateTest(AbstractTaskBatchUpdateTest): # All tasks are queued when we request cancellation of the job. self.do_batch_update(tasks, [0, 1, 2, 3], 4 * ['queued']) - self.assert_job_status('active') + self.assert_job_status('queued') self.set_job_status('cancel-requested')