Skip to content
Snippets Groups Projects
Commit 5e1cf163 authored by Sybren A. Stüvel's avatar Sybren A. Stüvel
Browse files

Server: implemented depsgraph endpoint

The /api/flamenco/managers/{manager-id}/depsgraph endpoint returns the list
of runnable tasks belonging to runnable jobs, assigned to that manager. All
tasks that were "queued" are set to "claimed-by-manager".
parent 7585111f
No related branches found
No related tags found
No related merge requests found
...@@ -12,6 +12,11 @@ log = logging.getLogger(__name__) ...@@ -12,6 +12,11 @@ log = logging.getLogger(__name__)
# TODO: maybe move allowed task transition handling to a different bit of code. # TODO: maybe move allowed task transition handling to a different bit of code.
ACCEPTED_AFTER_CANCEL_REQUESTED = {'canceled', 'failed', 'completed'} ACCEPTED_AFTER_CANCEL_REQUESTED = {'canceled', 'failed', 'completed'}
DEPSGRAPH_RUNNABLE_JOB_STATUSES = [u'queued', u'active', u'cancel-requested']
DEPSGRAPH_CLEAN_SLATE_TASK_STATUSES = [u'queued', u'claimed-by-manager',
u'active', u'cancel-requested']
DEPSGRAPH_MODIFIED_SINCE_TASK_STATUSES = [u'queued', u'claimed-by-manager']
def manager_api_call(wrapped): def manager_api_call(wrapped):
"""Decorator, performs some standard stuff for Manager API endpoints.""" """Decorator, performs some standard stuff for Manager API endpoints."""
...@@ -233,26 +238,50 @@ def get_depsgraph(manager_id, request_json): ...@@ -233,26 +238,50 @@ def get_depsgraph(manager_id, request_json):
with report_duration(log, 'depsgraph query'): with report_duration(log, 'depsgraph query'):
tasks_coll = current_flamenco.db('tasks') tasks_coll = current_flamenco.db('tasks')
query = {
task_query = {
'manager': manager_id, 'manager': manager_id,
'status': {'$nin': ['active']}, 'status': {'$nin': ['active']},
} }
if modified_since is not None:
modified_since = dateutil.parser.parse(modified_since)
query['_updated'] = {'$gt': modified_since}
cursor = tasks_coll.find(query, { if modified_since is None:
'_id': 1, # "Clean slate" query, get runnable jobs first.
'_updated': 1, jobs_coll = current_flamenco.db('jobs')
'status': 1, jobs = jobs_coll.find({
'parents': 1, 'manager': manager_id,
}) 'status': {'$in': DEPSGRAPH_RUNNABLE_JOB_STATUSES}},
projection={'_id': 1},
)
job_ids = [job['_id'] for job in jobs]
if not job_ids:
return '', 204 # empty response
log.debug('Requiring jobs to be in %s', job_ids)
task_query['job'] = {'$in': job_ids}
task_query['status'] = {'$in': DEPSGRAPH_CLEAN_SLATE_TASK_STATUSES}
else:
# Not clean slate, just give all updated tasks assigned to this manager.
modified_since = dateutil.parser.parse(modified_since)
task_query['_updated'] = {'$gt': modified_since}
task_query['status'] = {'$in': DEPSGRAPH_MODIFIED_SINCE_TASK_STATUSES}
cursor = tasks_coll.find(task_query)
depsgraph = list(cursor) depsgraph = list(cursor)
log.info('Returning depsgraph of %i tasks', len(depsgraph)) log.info('Returning depsgraph of %i tasks', len(depsgraph))
if modified_since is not None and len(depsgraph) == 0: if modified_since is not None and len(depsgraph) == 0:
return '', 304 return '', 304 # Not Modified
# Update the task status in the database to move queued tasks to claimed-by-manager.
task_query['status'] = u'queued'
tasks_coll.update_many(task_query,
{u'$set': {u'status': u'claimed-by-manager'}})
# Update the returned task statuses. Unfortunately Mongo doesn't support
# find_and_modify() on multiple documents.
for task in depsgraph:
if task['status'] == u'queued':
task['status'] = u'claimed-by-manager'
# Must be a dict to convert to BSON. # Must be a dict to convert to BSON.
respdoc = { respdoc = {
...@@ -263,8 +292,9 @@ def get_depsgraph(manager_id, request_json): ...@@ -263,8 +292,9 @@ def get_depsgraph(manager_id, request_json):
else: else:
resp = jsonify(respdoc) resp = jsonify(respdoc)
last_modification = max(task['_updated'] for task in depsgraph) if depsgraph:
resp.headers['Last-Modified'] = last_modification last_modification = max(task['_updated'] for task in depsgraph)
resp.headers['Last-Modified'] = last_modification
return resp return resp
......
...@@ -75,11 +75,14 @@ class AbstractFlamencoTest(AbstractPillarTest): ...@@ -75,11 +75,14 @@ class AbstractFlamencoTest(AbstractPillarTest):
projection={'status': 1}) projection={'status': 1})
self.assertEqual(job['status'], unicode(expected_status)) self.assertEqual(job['status'], unicode(expected_status))
def set_job_status(self, new_status): def set_job_status(self, new_status, job_id=None):
"""Nice, official, ripple-to-task-status approach""" """Nice, official, ripple-to-task-status approach"""
if job_id is None:
job_id = self.job_id
with self.app.test_request_context(): with self.app.test_request_context():
self.jmngr.api_set_job_status(self.job_id, new_status) self.jmngr.api_set_job_status(job_id, new_status)
def force_job_status(self, new_status): def force_job_status(self, new_status):
"""Directly to MongoDB approach""" """Directly to MongoDB approach"""
......
# -*- encoding: utf-8 -*-
from __future__ import absolute_import
from bson import ObjectId
from pillar.tests import common_test_data as ctd
from abstract_flamenco_test import AbstractFlamencoTest
class DepsgraphTest(AbstractFlamencoTest):
def setUp(self, **kwargs):
AbstractFlamencoTest.setUp(self, **kwargs)
from pillar.api.utils.authentication import force_cli_user
mngr_doc, account, token = self.create_manager_service_account()
self.mngr_id = mngr_doc['_id']
self.mngr_token = token['token']
# Create three test jobs, one of which is completed and two are queued.
with self.app.test_request_context():
force_cli_user()
job = self.jmngr.api_create_job(
'test job 1',
u'Wörk wørk w°rk.',
'sleep',
{
'frames': '12-18, 20-22',
'chunk_size': 3,
'time_in_seconds': 3,
},
self.proj_id,
ctd.EXAMPLE_PROJECT_OWNER_ID,
self.mngr_id,
)
self.jobid1 = job['_id']
job = self.jmngr.api_create_job(
'test job 2',
u'Wörk wørk w°rk.',
'sleep',
{
'frames': '12-18, 20-22',
'chunk_size': 3,
'time_in_seconds': 3,
},
self.proj_id,
ctd.EXAMPLE_PROJECT_OWNER_ID,
self.mngr_id,
)
self.jobid2 = job['_id']
job = self.jmngr.api_create_job(
'test job 3',
u'Wörk wørk w°rk.',
'sleep',
{
'frames': '12-18, 20-22',
'chunk_size': 3,
'time_in_seconds': 3,
},
self.proj_id,
ctd.EXAMPLE_PROJECT_OWNER_ID,
self.mngr_id,
)
self.jobid3 = job['_id']
assert isinstance(self.jobid1, ObjectId)
assert isinstance(self.jobid2, ObjectId)
assert isinstance(self.jobid3, ObjectId)
self.set_job_status('completed', job_id=self.jobid3)
self.tasks = list(self.flamenco.db('tasks').find({
'job': {'$in': [self.jobid1, self.jobid2]}
}))
self.task_ids = [t['_id'] for t in self.tasks]
def test_get_clean_slate(self):
from dateutil.parser import parse
# Just so we have a task that's known to be last-updated.
self.force_task_status(0, 'claimed-by-manager')
resp = self.get('/api/flamenco/managers/%s/depsgraph' % self.mngr_id,
auth_token=self.mngr_token)
depsgraph = resp.json()['depsgraph']
self.assertEqual(len(self.tasks), len(depsgraph))
self.assertEqual({unicode(t['_id']) for t in self.tasks},
{t['_id'] for t in depsgraph})
# Tasks should be returned in full, no projection.
task1 = self.tasks[1]
depstask1 = next(t for t in depsgraph if t['_id'] == unicode(task1['_id']))
self.assertEqual(set(task1.keys()), set(depstask1.keys()))
# The 'Last-Modified' header should contain the last-changed task.
last_modified = parse(resp.headers['Last-Modified'])
with self.app.test_request_context():
task0 = self.flamenco.db('tasks').find_one({'_id': self.task_ids[0]})
self.assertEqual(task0['_updated'], last_modified)
# The tasks in the database, as well as the response, should be set to claimed-by-manager
with self.app.test_request_context():
dbtasks = self.flamenco.db('tasks').find({'_id': {'$in': self.task_ids}})
self.assertEqual(8 * [u'claimed-by-manager'], [task['status'] for task in dbtasks])
self.assertEqual(8 * [u'claimed-by-manager'], [task['status'] for task in depsgraph])
def test_get_clean_slate_some_tasks_unrunnable(self):
self.force_task_status(0, 'failed')
self.force_task_status(1, 'canceled')
self.force_task_status(2, 'completed')
resp = self.get('/api/flamenco/managers/%s/depsgraph' % self.mngr_id,
auth_token=self.mngr_token)
depsgraph = resp.json()['depsgraph']
self.assertEqual(len(self.tasks) - 3, len(depsgraph))
deps_tids = {t['_id'] for t in depsgraph}
self.assertEqual({unicode(tid) for tid in self.task_ids[3:]}, deps_tids)
# The previously queued tasks in the database, as well as the response,
# should be set to claimed-by-manager
with self.app.test_request_context():
dbtasks = self.flamenco.db('tasks').find({'_id': {'$in': self.task_ids}})
self.assertEqual([u'failed', u'canceled', u'completed'] + 5 * [u'claimed-by-manager'],
[task['status'] for task in dbtasks])
self.assertEqual(5 * [u'claimed-by-manager'],
[task['status'] for task in depsgraph])
def test_get_subsequent_call(self):
from dateutil.parser import parse
# Get a clean slate first, so that we get the timestamp of last modification
resp = self.get('/api/flamenco/managers/%s/depsgraph' % self.mngr_id,
auth_token=self.mngr_token)
last_modified = resp.headers['Last-Modified']
# Do the subsequent call, it should return nothing.
self.get('/api/flamenco/managers/%s/depsgraph' % self.mngr_id,
auth_token=self.mngr_token,
headers={'If-Modified-Since': last_modified},
expected_status=304)
# Change some tasks to see what we get back.
self.force_task_status(0, 'claimed-by-manager')
self.force_task_status(1, 'cancel-requested')
self.force_task_status(2, 'queued')
resp = self.get('/api/flamenco/managers/%s/depsgraph' % self.mngr_id,
auth_token=self.mngr_token,
headers={'If-Modified-Since': last_modified})
depsgraph = resp.json()['depsgraph']
self.assertEqual(2, len(depsgraph)) # we should not get the cancel-requested task back.
deps_tids = {t['_id'] for t in depsgraph}
self.assertEqual({unicode(self.task_ids[0]),
unicode(self.task_ids[2])},
deps_tids)
# The 'Last-Modified' header should contain the last-changed task.
last_modified = parse(resp.headers['Last-Modified'])
with self.app.test_request_context():
task0 = self.flamenco.db('tasks').find_one({'_id': self.task_ids[0]})
task2 = self.flamenco.db('tasks').find_one({'_id': self.task_ids[2]})
self.assertEqual(task2['_updated'], last_modified)
self.assertEqual(task0['status'], u'claimed-by-manager')
self.assertEqual(task2['status'], u'claimed-by-manager')
self.assertEqual(2 * [u'claimed-by-manager'],
[task['status'] for task in depsgraph])
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment