diff --git a/packages/flamenco/flamenco/managers/api.py b/packages/flamenco/flamenco/managers/api.py index 2866cf750674a96c17aeccf247391d521b6d0826..0b4a301accfa4c8a4fa12ad152473126b5e5c7c4 100644 --- a/packages/flamenco/flamenco/managers/api.py +++ b/packages/flamenco/flamenco/managers/api.py @@ -12,6 +12,11 @@ log = logging.getLogger(__name__) # TODO: maybe move allowed task transition handling to a different bit of code. ACCEPTED_AFTER_CANCEL_REQUESTED = {'canceled', 'failed', 'completed'} +DEPSGRAPH_RUNNABLE_JOB_STATUSES = [u'queued', u'active', u'cancel-requested'] +DEPSGRAPH_CLEAN_SLATE_TASK_STATUSES = [u'queued', u'claimed-by-manager', + u'active', u'cancel-requested'] +DEPSGRAPH_MODIFIED_SINCE_TASK_STATUSES = [u'queued', u'claimed-by-manager'] + def manager_api_call(wrapped): """Decorator, performs some standard stuff for Manager API endpoints.""" @@ -233,26 +238,50 @@ def get_depsgraph(manager_id, request_json): with report_duration(log, 'depsgraph query'): tasks_coll = current_flamenco.db('tasks') - query = { + + task_query = { 'manager': manager_id, 'status': {'$nin': ['active']}, } - if modified_since is not None: - modified_since = dateutil.parser.parse(modified_since) - query['_updated'] = {'$gt': modified_since} - cursor = tasks_coll.find(query, { - '_id': 1, - '_updated': 1, - 'status': 1, - 'parents': 1, - }) + if modified_since is None: + # "Clean slate" query, get runnable jobs first. + jobs_coll = current_flamenco.db('jobs') + jobs = jobs_coll.find({ + 'manager': manager_id, + 'status': {'$in': DEPSGRAPH_RUNNABLE_JOB_STATUSES}}, + projection={'_id': 1}, + ) + job_ids = [job['_id'] for job in jobs] + if not job_ids: + return '', 204 # empty response + + log.debug('Requiring jobs to be in %s', job_ids) + task_query['job'] = {'$in': job_ids} + task_query['status'] = {'$in': DEPSGRAPH_CLEAN_SLATE_TASK_STATUSES} + else: + # Not clean slate, just give all updated tasks assigned to this manager. + modified_since = dateutil.parser.parse(modified_since) + task_query['_updated'] = {'$gt': modified_since} + task_query['status'] = {'$in': DEPSGRAPH_MODIFIED_SINCE_TASK_STATUSES} + cursor = tasks_coll.find(task_query) depsgraph = list(cursor) log.info('Returning depsgraph of %i tasks', len(depsgraph)) if modified_since is not None and len(depsgraph) == 0: - return '', 304 + return '', 304 # Not Modified + + # Update the task status in the database to move queued tasks to claimed-by-manager. + task_query['status'] = u'queued' + tasks_coll.update_many(task_query, + {u'$set': {u'status': u'claimed-by-manager'}}) + + # Update the returned task statuses. Unfortunately Mongo doesn't support + # find_and_modify() on multiple documents. + for task in depsgraph: + if task['status'] == u'queued': + task['status'] = u'claimed-by-manager' # Must be a dict to convert to BSON. respdoc = { @@ -263,8 +292,9 @@ def get_depsgraph(manager_id, request_json): else: resp = jsonify(respdoc) - last_modification = max(task['_updated'] for task in depsgraph) - resp.headers['Last-Modified'] = last_modification + if depsgraph: + last_modification = max(task['_updated'] for task in depsgraph) + resp.headers['Last-Modified'] = last_modification return resp diff --git a/packages/flamenco/tests/abstract_flamenco_test.py b/packages/flamenco/tests/abstract_flamenco_test.py index 1c55207442491ba42ab750d6493c8850cd79df5e..9ef0ba0c6641211b5df36e0fc6a7d32fd3e29cfa 100644 --- a/packages/flamenco/tests/abstract_flamenco_test.py +++ b/packages/flamenco/tests/abstract_flamenco_test.py @@ -75,11 +75,14 @@ class AbstractFlamencoTest(AbstractPillarTest): projection={'status': 1}) self.assertEqual(job['status'], unicode(expected_status)) - def set_job_status(self, new_status): + def set_job_status(self, new_status, job_id=None): """Nice, official, ripple-to-task-status approach""" + if job_id is None: + job_id = self.job_id + with self.app.test_request_context(): - self.jmngr.api_set_job_status(self.job_id, new_status) + self.jmngr.api_set_job_status(job_id, new_status) def force_job_status(self, new_status): """Directly to MongoDB approach""" diff --git a/packages/flamenco/tests/test_depsgraph.py b/packages/flamenco/tests/test_depsgraph.py new file mode 100644 index 0000000000000000000000000000000000000000..909ad44090254bf8ff653c92299e261d1bee1002 --- /dev/null +++ b/packages/flamenco/tests/test_depsgraph.py @@ -0,0 +1,169 @@ +# -*- encoding: utf-8 -*- +from __future__ import absolute_import + +from bson import ObjectId + +from pillar.tests import common_test_data as ctd +from abstract_flamenco_test import AbstractFlamencoTest + + +class DepsgraphTest(AbstractFlamencoTest): + def setUp(self, **kwargs): + AbstractFlamencoTest.setUp(self, **kwargs) + + from pillar.api.utils.authentication import force_cli_user + + mngr_doc, account, token = self.create_manager_service_account() + self.mngr_id = mngr_doc['_id'] + self.mngr_token = token['token'] + + # Create three test jobs, one of which is completed and two are queued. + with self.app.test_request_context(): + force_cli_user() + job = self.jmngr.api_create_job( + 'test job 1', + u'Wörk wørk w°rk.', + 'sleep', + { + 'frames': '12-18, 20-22', + 'chunk_size': 3, + 'time_in_seconds': 3, + }, + self.proj_id, + ctd.EXAMPLE_PROJECT_OWNER_ID, + self.mngr_id, + ) + self.jobid1 = job['_id'] + job = self.jmngr.api_create_job( + 'test job 2', + u'Wörk wørk w°rk.', + 'sleep', + { + 'frames': '12-18, 20-22', + 'chunk_size': 3, + 'time_in_seconds': 3, + }, + self.proj_id, + ctd.EXAMPLE_PROJECT_OWNER_ID, + self.mngr_id, + ) + self.jobid2 = job['_id'] + job = self.jmngr.api_create_job( + 'test job 3', + u'Wörk wørk w°rk.', + 'sleep', + { + 'frames': '12-18, 20-22', + 'chunk_size': 3, + 'time_in_seconds': 3, + }, + self.proj_id, + ctd.EXAMPLE_PROJECT_OWNER_ID, + self.mngr_id, + ) + self.jobid3 = job['_id'] + assert isinstance(self.jobid1, ObjectId) + assert isinstance(self.jobid2, ObjectId) + assert isinstance(self.jobid3, ObjectId) + + self.set_job_status('completed', job_id=self.jobid3) + + self.tasks = list(self.flamenco.db('tasks').find({ + 'job': {'$in': [self.jobid1, self.jobid2]} + })) + self.task_ids = [t['_id'] for t in self.tasks] + + def test_get_clean_slate(self): + from dateutil.parser import parse + + # Just so we have a task that's known to be last-updated. + self.force_task_status(0, 'claimed-by-manager') + + resp = self.get('/api/flamenco/managers/%s/depsgraph' % self.mngr_id, + auth_token=self.mngr_token) + depsgraph = resp.json()['depsgraph'] + self.assertEqual(len(self.tasks), len(depsgraph)) + self.assertEqual({unicode(t['_id']) for t in self.tasks}, + {t['_id'] for t in depsgraph}) + + # Tasks should be returned in full, no projection. + task1 = self.tasks[1] + depstask1 = next(t for t in depsgraph if t['_id'] == unicode(task1['_id'])) + self.assertEqual(set(task1.keys()), set(depstask1.keys())) + + # The 'Last-Modified' header should contain the last-changed task. + last_modified = parse(resp.headers['Last-Modified']) + with self.app.test_request_context(): + task0 = self.flamenco.db('tasks').find_one({'_id': self.task_ids[0]}) + self.assertEqual(task0['_updated'], last_modified) + + # The tasks in the database, as well as the response, should be set to claimed-by-manager + with self.app.test_request_context(): + dbtasks = self.flamenco.db('tasks').find({'_id': {'$in': self.task_ids}}) + self.assertEqual(8 * [u'claimed-by-manager'], [task['status'] for task in dbtasks]) + self.assertEqual(8 * [u'claimed-by-manager'], [task['status'] for task in depsgraph]) + + def test_get_clean_slate_some_tasks_unrunnable(self): + self.force_task_status(0, 'failed') + self.force_task_status(1, 'canceled') + self.force_task_status(2, 'completed') + + resp = self.get('/api/flamenco/managers/%s/depsgraph' % self.mngr_id, + auth_token=self.mngr_token) + depsgraph = resp.json()['depsgraph'] + self.assertEqual(len(self.tasks) - 3, len(depsgraph)) + + deps_tids = {t['_id'] for t in depsgraph} + self.assertEqual({unicode(tid) for tid in self.task_ids[3:]}, deps_tids) + + # The previously queued tasks in the database, as well as the response, + # should be set to claimed-by-manager + with self.app.test_request_context(): + dbtasks = self.flamenco.db('tasks').find({'_id': {'$in': self.task_ids}}) + self.assertEqual([u'failed', u'canceled', u'completed'] + 5 * [u'claimed-by-manager'], + [task['status'] for task in dbtasks]) + self.assertEqual(5 * [u'claimed-by-manager'], + [task['status'] for task in depsgraph]) + + def test_get_subsequent_call(self): + from dateutil.parser import parse + + # Get a clean slate first, so that we get the timestamp of last modification + resp = self.get('/api/flamenco/managers/%s/depsgraph' % self.mngr_id, + auth_token=self.mngr_token) + last_modified = resp.headers['Last-Modified'] + + # Do the subsequent call, it should return nothing. + self.get('/api/flamenco/managers/%s/depsgraph' % self.mngr_id, + auth_token=self.mngr_token, + headers={'If-Modified-Since': last_modified}, + expected_status=304) + + # Change some tasks to see what we get back. + self.force_task_status(0, 'claimed-by-manager') + self.force_task_status(1, 'cancel-requested') + self.force_task_status(2, 'queued') + + resp = self.get('/api/flamenco/managers/%s/depsgraph' % self.mngr_id, + auth_token=self.mngr_token, + headers={'If-Modified-Since': last_modified}) + + depsgraph = resp.json()['depsgraph'] + self.assertEqual(2, len(depsgraph)) # we should not get the cancel-requested task back. + + deps_tids = {t['_id'] for t in depsgraph} + self.assertEqual({unicode(self.task_ids[0]), + unicode(self.task_ids[2])}, + deps_tids) + + # The 'Last-Modified' header should contain the last-changed task. + last_modified = parse(resp.headers['Last-Modified']) + with self.app.test_request_context(): + task0 = self.flamenco.db('tasks').find_one({'_id': self.task_ids[0]}) + task2 = self.flamenco.db('tasks').find_one({'_id': self.task_ids[2]}) + self.assertEqual(task2['_updated'], last_modified) + + self.assertEqual(task0['status'], u'claimed-by-manager') + self.assertEqual(task2['status'], u'claimed-by-manager') + self.assertEqual(2 * [u'claimed-by-manager'], + [task['status'] for task in depsgraph])