Skip to content
Snippets Groups Projects
Commit 35dfa365 authored by Sybren A. Stüvel's avatar Sybren A. Stüvel
Browse files

Server: removed the old scheduler, and no longer setting jobs to active on fetch

When a manager gets /api/f/mng/{mng-id}/depsgraph, the returned jobs are
no longer automatically set to 'active'. This is delayed until a task
is updated to 'active', 'completed', or 'failed' (but only if that doesn't
fail the job yet).
parent 9f9c9b0c
No related branches found
No related tags found
No related merge requests found
......@@ -65,7 +65,6 @@ class FlamencoExtension(PillarExtension):
from . import routes
import flamenco.jobs.routes
import flamenco.tasks.routes
import flamenco.scheduler.routes
return [
routes.blueprint,
......@@ -73,7 +72,6 @@ class FlamencoExtension(PillarExtension):
flamenco.jobs.routes.blueprint,
flamenco.tasks.routes.perjob_blueprint,
flamenco.tasks.routes.perproject_blueprint,
flamenco.scheduler.routes.blueprint,
]
@property
......@@ -100,9 +98,6 @@ class FlamencoExtension(PillarExtension):
jobs.setup_app(app)
tasks.setup_app(app)
# Imports for side-effects
from . import scheduler
def _create_collections(self, db):
import pymongo
......
......@@ -135,9 +135,20 @@ class JobManager(object):
"""Updates the job status based on the status of this task and other tasks in the job.
"""
if new_task_status == {'queued', 'cancel-requested'}:
def __job_active_if_queued():
"""Set job to active if it was queued."""
jobs_coll = current_flamenco.db('jobs')
job = jobs_coll.find_one(job_id, projection={'status': 1})
if job['status'] == 'queued':
self._log.info('Job %s became active because one of its tasks %s changed '
'status to %s', job_id, task_id, new_task_status)
self.api_set_job_status(job_id, 'active')
if new_task_status == {'queued', 'cancel-requested', 'claimed-by-manager'}:
# Ignore; for now re-queueing a task doesn't change the job status.
# Also, canceling a single task has no influence on the job itself.
# A task being claimed by the manager also doesn't change job status.
return
if new_task_status == 'canceled':
......@@ -164,9 +175,10 @@ class JobManager(object):
self._log.warning('Task %s of job %s failed; '
'only %i of its %i tasks failed (%i%%), so ignoring for now',
task_id, job_id, fail_count, total_count, fail_perc)
__job_active_if_queued()
return
if new_task_status in {'claimed-by-manager', 'active', 'processing'}:
if new_task_status in {'active', 'processing'}:
self._log.info('Job %s became active because one of its tasks %s changed status to %s',
job_id, task_id, new_task_status)
self.api_set_job_status(job_id, 'active')
......@@ -181,6 +193,8 @@ class JobManager(object):
'setting job to completed.',
task_id, job_id)
self.api_set_job_status(job_id, 'completed')
else:
__job_active_if_queued()
return
self._log.warning('Task %s of job %s obtained status %s, '
......
import logging
import flask
from pillar.api.utils import authorization
log = logging.getLogger(__name__)
blueprint = flask.Blueprint('flamenco.scheduler', __name__, url_prefix='/scheduler')
# The scheduler is in charge of
# - generating a task (or task list) for each manager request
# - update the task list according to external changes
CLAIMED_STATUS = 'claimed-by-manager'
CLAIMED_ACTIVITY = 'Claimed by manager'
@blueprint.route('/tasks/<manager_id>')
@authorization.require_login(require_roles={u'service', u'flamenco_manager'}, require_all=True)
def schedule_tasks(manager_id):
"""Upon request from a manager, picks the first task available and returns
it in JSON format.
Read from request args:
- job_type (e.g. simple_blender_render)
- worker (optional, the worker name)
- Validate request (is it a manager?)
- Get manager document
- Query by manager and job_type (sort by priority and creation date)
- TODO: allow multiple tasks to be requested
- Optionally handle the 'worker' arg
This is an API endpoint, so we interface directly with the database.
"""
from flamenco import current_flamenco
from pillar.api.utils import jsonify, str2id
manager_id = str2id(manager_id)
chunk_size = int(flask.request.args.get('chunk_size', 1))
job_type = flask.request.args.get('job_type')
log.debug('Handing over max %i tasks to manager %s', chunk_size, manager_id)
# TODO: properly order tasks based on parents' status etc.
tasks_coll = current_flamenco.db('tasks')
query = {
'status': 'queued',
'manager': manager_id,
}
if job_type:
query['job_type'] = job_type
tasks = []
affected_jobs = set()
for task in tasks_coll.find(query).sort("priority", -1):
task['status'] = CLAIMED_STATUS
tasks.append(task)
affected_jobs.add(task['job'])
if len(tasks) >= chunk_size:
break
if not tasks:
# Nothing to hand out.
return jsonify([])
# Do an update directly via MongoDB and not via Eve. Doing it via Eve
# requires permissions to do a GET on the task, which we don't want
# to allow to managers (to force them to use the scheduler).
tasks_coll.update_many(
{'_id': {'$in': [task['_id'] for task in tasks]}},
{'$set': {'status': CLAIMED_STATUS,
'activity': CLAIMED_ACTIVITY}}
)
log.info('Handing over %i tasks to manager %s', len(tasks), manager_id)
# Update the affected jobs.
for job_id in affected_jobs:
current_flamenco.job_manager.update_job_after_task_status_change(job_id, 'unknown',
CLAIMED_STATUS)
resp = jsonify(tasks)
return resp
......@@ -71,8 +71,7 @@ class AbstractFlamencoTest(AbstractPillarTest):
def assert_job_status(self, expected_status):
with self.app.test_request_context():
jobs_coll = self.flamenco.db('jobs')
job = jobs_coll.find_one({'_id': self.job_id},
projection={'status': 1})
job = jobs_coll.find_one(self.job_id, projection={'status': 1})
self.assertEqual(job['status'], unicode(expected_status))
def set_job_status(self, new_status, job_id=None):
......
......@@ -126,6 +126,7 @@ class DepsgraphTest(AbstractFlamencoTest):
[task['status'] for task in depsgraph])
def test_get_subsequent_call(self):
import time
from dateutil.parser import parse
# Get a clean slate first, so that we get the timestamp of last modification
......@@ -140,6 +141,7 @@ class DepsgraphTest(AbstractFlamencoTest):
expected_status=304)
# Change some tasks to see what we get back.
time.sleep(0.05) # sleep a bit to stabilise the test.
self.force_task_status(0, 'claimed-by-manager')
self.force_task_status(1, 'cancel-requested')
self.force_task_status(2, 'queued')
......
# -*- encoding: utf-8 -*-
from __future__ import absolute_import
from bson import ObjectId
from pillar.tests import common_test_data as ctd
from abstract_flamenco_test import AbstractFlamencoTest
class TaskSchedulerTest(AbstractFlamencoTest):
def setUp(self, **kwargs):
AbstractFlamencoTest.setUp(self, **kwargs)
from pillar.api.utils.authentication import force_cli_user
mngr_doc, account, token = self.create_manager_service_account()
self.mngr_id = mngr_doc['_id']
self.mngr_token = token['token']
with self.app.test_request_context():
force_cli_user()
self.jmngr.api_create_job(
'test job',
u'Wörk wørk w°rk.',
'sleep',
{
'frames': '12-18, 20-22',
'chunk_size': 3,
'time_in_seconds': 3,
},
self.proj_id,
ctd.EXAMPLE_PROJECT_OWNER_ID,
self.mngr_id,
)
def _assert_sleep_task(self, expected_name, expected_status, task):
self.assertEqual(expected_name, task['name'])
self.assertEqual('sleep', task['job_type'])
self.assertEqual(expected_status, task['status'])
self.assertEqual(str(self.mngr_id), str(task['manager']))
self.assertEqual([
{'name': 'echo', 'settings': {'message': 'Preparing to sleep'}},
{'name': 'sleep', 'settings': {'time_in_seconds': 3}},
], task['commands'])
def test_default_chunked(self):
from flamenco import current_flamenco
chunk = self.get('/flamenco/scheduler/tasks/%s' % self.mngr_id,
auth_token=self.mngr_token).json()
self.assertEqual(1, len(chunk))
task = chunk[0]
self._assert_sleep_task('sleep-12-14', 'claimed-by-manager', task)
# Check that the status in the database changed too.
with self.app.test_request_context():
tasks_coll = current_flamenco.db('tasks')
task = tasks_coll.find_one({'_id': ObjectId(task['_id'])})
self._assert_sleep_task('sleep-12-14', 'claimed-by-manager', task)
def test_chunked(self):
chunk = self.get('/flamenco/scheduler/tasks/%s?chunk_size=2' % self.mngr_id,
auth_token=self.mngr_token).json()
self.assertEqual(2, len(chunk))
self._assert_sleep_task('sleep-12-14', 'claimed-by-manager', chunk[0])
self._assert_sleep_task('sleep-15-17', 'claimed-by-manager', chunk[1])
# Check that the last task hasn't been touched yet.
with self.app.test_request_context():
tasks_coll = self.flamenco.db('tasks')
task = tasks_coll.find_one({'name': 'sleep-18,20,21'})
self._assert_sleep_task('sleep-18,20,21', 'queued', task)
def test_by_priority(self):
from pillar.api.utils.authentication import force_cli_user
with self.app.test_request_context():
force_cli_user()
high_prio_job = self.jmngr.api_create_job(
'test job high prio',
u'Wörk wørk w°rk.',
'sleep',
{
'frames': '12-18, 20-22',
'chunk_size': 3,
'time_in_seconds': 3,
},
self.proj_id,
ctd.EXAMPLE_PROJECT_OWNER_ID,
self.mngr_id,
priority=100,
)
high_prio_jobid = high_prio_job['_id']
# Without proper sorting, this will return the first task, i.e. the one of the
# medium-priority job created in setUp().
chunk = self.get('/flamenco/scheduler/tasks/%s?chunk_size=1' % self.mngr_id,
auth_token=self.mngr_token).json()
self.assertEqual(unicode(high_prio_jobid), chunk[0]['job'])
# The task should be initialised to the job's priority.
self.assertEqual(100, chunk[0]['priority'])
......@@ -38,8 +38,8 @@ class TaskPatchingTest(AbstractFlamencoTest):
self.job_id = job['_id']
def test_set_task_invalid_status(self):
chunk = self.get('/flamenco/scheduler/tasks/%s' % self.mngr_id,
auth_token=self.mngr_token).json()
chunk = self.get('/api/flamenco/managers/%s/depsgraph' % self.mngr_id,
auth_token=self.mngr_token).json()['depsgraph']
task = chunk[0]
task_url = '/api/flamenco/tasks/%s' % task['_id']
......@@ -59,8 +59,8 @@ class TaskPatchingTest(AbstractFlamencoTest):
self.assertEqual('claimed-by-manager', task['status'])
def test_set_task_valid_status(self):
chunk = self.get('/flamenco/scheduler/tasks/%s' % self.mngr_id,
auth_token=self.mngr_token).json()
chunk = self.get('/api/flamenco/managers/%s/depsgraph' % self.mngr_id,
auth_token=self.mngr_token).json()['depsgraph']
task = chunk[0]
task_url = '/api/flamenco/tasks/%s' % task['_id']
......@@ -86,9 +86,8 @@ class TaskPatchingTest(AbstractFlamencoTest):
# The test job consists of 4 tasks; get their IDs through the scheduler.
# This should set the job status to active.
tasks = self.get('/flamenco/scheduler/tasks/%s?chunk_size=1000' % self.mngr_id,
auth_token=self.mngr_token).json()
self.assert_job_status('active')
tasks = self.get('/api/flamenco/managers/%s/depsgraph' % self.mngr_id,
auth_token=self.mngr_token).json()['depsgraph']
self.assertEqual(4, len(tasks))
# After setting tasks 1-3 to 'completed' the job should still not be completed.
......
......@@ -19,10 +19,11 @@ class AbstractTaskBatchUpdateTest(AbstractFlamencoTest):
def do_schedule_tasks(self):
# The test job consists of 4 tasks; get their IDs through the scheduler.
# This should set the job status to active, and the task status to claimed-by-manager.
tasks = self.get('/flamenco/scheduler/tasks/%s?chunk_size=1000' % self.mngr_id,
auth_token=self.mngr_token).json()
self.assert_job_status('active')
# This should set the task status to claimed-by-manager.
tasks = self.get('/api/flamenco/managers/%s/depsgraph' % self.mngr_id,
auth_token=self.mngr_token).json()['depsgraph']
# TODO: maybe claimed-by-manager?
# self.assert_job_status('active')
self.assertEqual(self.TASK_COUNT, len(tasks))
return tasks
......@@ -72,8 +73,8 @@ class TaskBatchUpdateTest(AbstractTaskBatchUpdateTest):
self.job_id = job['_id']
def test_set_task_invalid_status(self):
chunk = self.get('/flamenco/scheduler/tasks/%s' % self.mngr_id,
auth_token=self.mngr_token).json()
chunk = self.get('/api/flamenco/managers/%s/depsgraph' % self.mngr_id,
auth_token=self.mngr_token).json()['depsgraph']
task = chunk[0]
# A warning should be logged and the status should be rejected.
......@@ -96,8 +97,8 @@ class TaskBatchUpdateTest(AbstractTaskBatchUpdateTest):
def test_illegal_active_after_cancel_requested(self):
from flamenco import current_flamenco
chunk = self.get('/flamenco/scheduler/tasks/%s' % self.mngr_id,
auth_token=self.mngr_token).json()
chunk = self.get('/api/flamenco/managers/%s/depsgraph' % self.mngr_id,
auth_token=self.mngr_token).json()['depsgraph']
task = chunk[0]
# Request task cancellation after it was received by the manager.
......@@ -124,8 +125,8 @@ class TaskBatchUpdateTest(AbstractTaskBatchUpdateTest):
def test_canceled_after_cancel_requested(self):
from flamenco import current_flamenco
chunk = self.get('/flamenco/scheduler/tasks/%s' % self.mngr_id,
auth_token=self.mngr_token).json()
chunk = self.get('/api/flamenco/managers/%s/depsgraph' % self.mngr_id,
auth_token=self.mngr_token).json()['depsgraph']
task = chunk[0]
# Request task cancellation after it was received by the manager.
......@@ -178,6 +179,19 @@ class TaskBatchUpdateTest(AbstractTaskBatchUpdateTest):
expect_cancel_task_ids={tasks[0]['_id'], tasks[2]['_id'], tasks[3]['_id']})
self.assert_job_status('failed')
def test_job_status_active_after_task_update(self):
"""A job should go to active when its tasks are being updated.
"""
self.force_job_status('queued')
tasks = self.do_schedule_tasks()
# Any of these statuses should set the job to active.
for status in (u'active', u'completed'):
self.force_job_status('queued')
self.do_batch_update(tasks, [0], [status])
self.assert_job_status('active')
def test_job_status_canceled_due_to_task_update(self):
"""When the last cancel-requested task goes to canceled, a cancel-requested job should too.
"""
......@@ -213,7 +227,7 @@ class TaskBatchUpdateTest(AbstractTaskBatchUpdateTest):
# All tasks are queued when we request cancellation of the job.
self.do_batch_update(tasks, [0, 1, 2, 3], 4 * ['queued'])
self.assert_job_status('active')
self.assert_job_status('queued')
self.set_job_status('cancel-requested')
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment