Skip to content
Snippets Groups Projects
Commit 0830a457 authored by Sybren A. Stüvel's avatar Sybren A. Stüvel
Browse files

Server: Changes in the job status also affect the job's task statuses.

The job status can change via JobManager.set_job_status() or by PUTting
a job document with a changed status.
parent 06620860
No related branches found
No related tags found
No related merge requests found
......@@ -192,6 +192,10 @@ class FlamencoExtension(PillarExtension):
Flamenco managers.
"""
return self.update_status_q(collection_name, {'_id': document_id}, new_status)
def update_status_q(self, collection_name, query, new_status):
from flamenco import eve_settings, current_flamenco
import datetime
import uuid
......@@ -211,21 +215,17 @@ class FlamencoExtension(PillarExtension):
etag = uuid.uuid4().hex
collection = current_flamenco.db(collection_name)
result = collection.update_one(
{'_id': document_id},
result = collection.update_many(
query,
{'$set': {'status': new_status,
'_updated': datetime.datetime.now(tz=tz_util.utc),
'_etag': etag}}
)
if result.matched_count < 1:
raise wz_exceptions.NotFound('%s %s does not exist' % (singular_name, document_id))
if result.matched_count > 1:
self._log.warning('Eek, %i %s with same ID %s, should be impossible',
result.matched_count, collection_name, document_id)
raise wz_exceptions.NotFound('%s %s does not exist' % (singular_name, query))
self._log.debug('Updated status of %i %s %s',
result.modified_count, singular_name, document_id)
self._log.debug('Updated status of %i %s %s to %s',
result.modified_count, singular_name, query, new_status)
def link_for_activity(self, act):
"""Returns the URL for the activity.
......
......@@ -231,7 +231,52 @@ class JobManager(object):
from flamenco import current_flamenco
jobs_coll = current_flamenco.db('jobs')
curr_job = jobs_coll.find_one({'_id': job_id}, projection={'status': 1})
old_status = curr_job['status']
current_flamenco.update_status('jobs', job_id, new_status)
self.handle_job_status_change(job_id, old_status, new_status)
def handle_job_status_change(self, job_id, old_status, new_status):
"""Updates task statuses based on this job status transition."""
query = None
to_status = None
if new_status == 'completed':
# Nothing to do; this will happen as a response to all tasks being completed.
pass
elif new_status == 'active':
# Nothing to do; this happens when a task gets started, which has nothing to
# do with other tasks in the job.
pass
elif new_status in {'canceled', 'failed'}:
# Cancel any task that might run in the future.
query = {'status': {'$in': ['active', 'queued', 'claimed-by-manager']}}
to_status = 'canceled'
elif new_status == 'queued':
if old_status == 'completed':
# Re-queue all tasks.
query = {}
else:
# Re-queue any non-completed task.
query = {'status': {'$ne': 'completed'}}
to_status = 'queued'
if query is None:
self._log.debug('Job %s status change from %s to %s has no effect on tasks.',
job_id, old_status, new_status)
return
if to_status is None:
self._log.error('Job %s status change from %s to %s has to_status=None, aborting.',
job_id, old_status, new_status)
return
# Update the tasks.
query['job'] = job_id
from flamenco import current_flamenco
current_flamenco.update_status_q('tasks', query, to_status)
def setup_app(app):
......
......@@ -75,6 +75,39 @@ def check_job_permissions_modify(job_doc, original_doc=None):
# FIXME: check user access to the project.
handle_job_status_update(job_doc, original_doc)
def handle_job_status_update(job_doc, original_doc):
"""Calls upon the JobManager to handle a job status update, if there is any."""
if original_doc is None:
return
job_id = job_doc.get('_id')
if not job_id:
log.warning('handle_job_status_update: No _id in new job document, rejecting')
raise wz_exceptions.UnprocessableEntity('missing _id')
try:
old_status = original_doc['status']
except KeyError:
log.info('handle_job_status_update: No status in old job document %s, ignoring', job_id)
return
try:
new_status = job_doc['status']
except KeyError:
log.warning('handle_job_status_update: No status in new job document %s, rejecting', job_id)
raise wz_exceptions.UnprocessableEntity('missing status field')
if old_status == new_status:
# No change, so nothing to handle.
return
from flamenco import current_flamenco
current_flamenco.job_manager.handle_job_status_change(job_id, old_status, new_status)
def setup_app(app):
app.on_inserted_flamenco_jobs = after_inserting_jobs
......
# -*- encoding: utf-8 -*-
from __future__ import absolute_import
import mock
from pillar.tests import common_test_data as ctd
from abstract_flamenco_test import AbstractFlamencoTest
......@@ -60,3 +62,181 @@ class JobManagerTest(AbstractFlamencoTest):
u'time_in_seconds': 3,
}
}, task['commands'][1])
class JobStatusChangeTest(AbstractFlamencoTest):
def setUp(self, **kwargs):
super(JobStatusChangeTest, self).setUp(**kwargs)
# Create a job with 4 tasks
from pillar.api.utils.authentication import force_cli_user
manager, _, token = self.create_manager_service_account()
self.mngr_token = token['token']
with self.app.test_request_context():
force_cli_user()
job = self.jmngr.api_create_job(
'test job',
u'Wörk wørk w°rk.',
'blender-render',
{
'filepath': u'/my/blend.file',
'frames': u'12-18, 20-23',
'chunk_size': 2,
'time_in_seconds': 3,
},
self.proj_id,
ctd.EXAMPLE_PROJECT_OWNER_ID,
manager['_id'],
)
self.job_id = job['_id']
# Fetch the task IDs and set the task statuses to a fixed list.
tasks_coll = self.flamenco.db('tasks')
tasks = tasks_coll.find({'job': self.job_id}, projection={'_id': 1})
self.task_ids = [task['_id'] for task in tasks]
self.assertEqual(6, len(self.task_ids))
self.set_task_status(0, 'queued')
self.set_task_status(1, 'claimed-by-manager')
self.set_task_status(2, 'completed')
self.set_task_status(3, 'active')
self.set_task_status(4, 'canceled')
self.set_task_status(5, 'failed')
def set_job_status(self, new_status):
"""Nice, official, ripple-to-task-status approach"""
with self.app.test_request_context():
self.jmngr.set_job_status(self.job_id, new_status)
def force_job_status(self, new_status):
"""Directly to MongoDB approach"""
with self.app.test_request_context():
jobs_coll = self.flamenco.db('jobs')
result = jobs_coll.update_one({'_id': self.job_id},
{'$set': {'status': new_status}})
self.assertEqual(1, result.matched_count)
def set_task_status(self, task_idx, new_status):
"""Sets the task status directly in MongoDB.
This should only be used to set up a certain scenario.
"""
from flamenco import current_flamenco
with self.app.test_request_context():
current_flamenco.update_status('tasks', self.task_ids[task_idx], new_status)
def assert_job_status(self, expected_status):
with self.app.test_request_context():
jobs_coll = self.flamenco.db('jobs')
job = jobs_coll.find_one({'_id': self.job_id},
projection={'status': 1})
self.assertEqual(job['status'], unicode(expected_status))
def assert_task_status(self, task_idx, expected_status):
with self.app.test_request_context():
tasks_coll = self.flamenco.db('tasks')
task = tasks_coll.find_one({'_id': self.task_ids[task_idx]},
projection={'status': 1})
self.assertIsNotNone(task)
self.assertEqual(task['status'], unicode(expected_status),
"Task %i:\n has status: '%s'\n but expected: '%s'" % (
task_idx, task['status'], expected_status))
def test_status_from_queued_to_active(self):
# This shouldn't change any of the tasks.
self.force_job_status('queued')
self.set_job_status('active')
self.assert_task_status(0, 'queued') # was: queued
self.assert_task_status(1, 'claimed-by-manager') # was: claimed-by-manager
self.assert_task_status(2, 'completed') # was: completed
self.assert_task_status(3, 'active') # was: active
self.assert_task_status(4, 'canceled') # was: canceled
self.assert_task_status(5, 'failed') # was: failed
def test_status_from_active_to_canceled(self):
# This should cancel all tasks that could possibly still run.
self.force_job_status('active')
self.set_job_status('canceled')
self.assert_task_status(0, 'canceled') # was: queued
self.assert_task_status(1, 'canceled') # was: claimed-by-manager
self.assert_task_status(2, 'completed') # was: completed
self.assert_task_status(3, 'canceled') # was: active
self.assert_task_status(4, 'canceled') # was: canceled
self.assert_task_status(5, 'failed') # was: failed
def test_status_from_canceled_to_queued(self):
# This should re-queue all non-completed tasks.
self.force_job_status('canceled')
self.set_job_status('queued')
self.assert_task_status(0, 'queued') # was: queued
self.assert_task_status(1, 'queued') # was: claimed-by-manager
self.assert_task_status(2, 'completed') # was: completed
self.assert_task_status(3, 'queued') # was: active
self.assert_task_status(4, 'queued') # was: canceled
self.assert_task_status(5, 'queued') # was: failed
def test_status_from_completed_to_queued(self):
# This should re-queue all tasks.
self.force_job_status('completed')
self.set_job_status('queued')
self.assert_task_status(0, 'queued') # was: queued
self.assert_task_status(1, 'queued') # was: claimed-by-manager
self.assert_task_status(2, 'queued') # was: completed
self.assert_task_status(3, 'queued') # was: active
self.assert_task_status(4, 'queued') # was: canceled
self.assert_task_status(5, 'queued') # was: failed
def test_status_from_active_to_failed(self):
# This should be the same as going to 'canceled', except that the underlying reason
# to go to this state is different (active action by user vs. result of massive task
# failure).
self.force_job_status('active')
self.set_job_status('failed')
self.assert_task_status(0, 'canceled') # was: queued
self.assert_task_status(1, 'canceled') # was: claimed-by-manager
self.assert_task_status(2, 'completed') # was: completed
self.assert_task_status(3, 'canceled') # was: active
self.assert_task_status(4, 'canceled') # was: canceled
self.assert_task_status(5, 'failed') # was: failed
def test_status_from_active_to_completed(self):
# Shouldn't do anything, as going to completed is a result of all tasks being completed.
self.assert_task_status(0, 'queued') # was: queued
self.assert_task_status(1, 'claimed-by-manager') # was: claimed-by-manager
self.assert_task_status(2, 'completed') # was: completed
self.assert_task_status(3, 'active') # was: active
self.assert_task_status(4, 'canceled') # was: canceled
self.assert_task_status(5, 'failed') # was: failed
@mock.patch('flamenco.jobs.JobManager.handle_job_status_change')
def test_put_job(self, handle_job_status_change):
"""Test that flamenco.jobs.JobManager.handle_job_status_change is called when we PUT."""
from pillar.api.utils import remove_private_keys
self.create_user(24 * 'a', roles=('admin',))
self.create_valid_auth_token(24 * 'a', token='admin-token')
json_job = self.get('/api/flamenco/jobs/%s' % self.job_id,
auth_token='admin-token').json()
json_job['status'] = 'canceled'
self.put('/api/flamenco/jobs/%s' % self.job_id,
json=remove_private_keys(json_job),
headers={'If-Match': json_job['_etag']},
auth_token='admin-token')
handle_job_status_change.assert_called_once_with(self.job_id, 'queued', 'canceled')
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment