Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# -*- encoding: utf-8 -*-
from __future__ import absolute_import
from bson import ObjectId
from pillar.tests import common_test_data as ctd
from abstract_flamenco_test import AbstractFlamencoTest
class DepsgraphTest(AbstractFlamencoTest):
def setUp(self, **kwargs):
AbstractFlamencoTest.setUp(self, **kwargs)
from pillar.api.utils.authentication import force_cli_user
mngr_doc, account, token = self.create_manager_service_account()
self.mngr_id = mngr_doc['_id']
self.mngr_token = token['token']
# Create three test jobs, one of which is completed and two are queued.
with self.app.test_request_context():
force_cli_user()
job = self.jmngr.api_create_job(
'test job 1',
u'Wörk wørk w°rk.',
'sleep',
{
'frames': '12-18, 20-22',
'chunk_size': 3,
'time_in_seconds': 3,
},
self.proj_id,
ctd.EXAMPLE_PROJECT_OWNER_ID,
self.mngr_id,
)
self.jobid1 = job['_id']
job = self.jmngr.api_create_job(
'test job 2',
u'Wörk wørk w°rk.',
'sleep',
{
'frames': '12-18, 20-22',
'chunk_size': 3,
'time_in_seconds': 3,
},
self.proj_id,
ctd.EXAMPLE_PROJECT_OWNER_ID,
self.mngr_id,
)
self.jobid2 = job['_id']
job = self.jmngr.api_create_job(
'test job 3',
u'Wörk wørk w°rk.',
'sleep',
{
'frames': '12-18, 20-22',
'chunk_size': 3,
'time_in_seconds': 3,
},
self.proj_id,
ctd.EXAMPLE_PROJECT_OWNER_ID,
self.mngr_id,
)
self.jobid3 = job['_id']
assert isinstance(self.jobid1, ObjectId)
assert isinstance(self.jobid2, ObjectId)
assert isinstance(self.jobid3, ObjectId)
self.set_job_status('completed', job_id=self.jobid3)
self.tasks = list(self.flamenco.db('tasks').find({
'job': {'$in': [self.jobid1, self.jobid2]}
}))
self.task_ids = [t['_id'] for t in self.tasks]
def test_get_clean_slate(self):
from dateutil.parser import parse
# Just so we have a task that's known to be last-updated.
self.force_task_status(0, 'claimed-by-manager')
resp = self.get('/api/flamenco/managers/%s/depsgraph' % self.mngr_id,
auth_token=self.mngr_token)
depsgraph = resp.json()['depsgraph']
self.assertEqual(len(self.tasks), len(depsgraph))
self.assertEqual({unicode(t['_id']) for t in self.tasks},
{t['_id'] for t in depsgraph})
# Tasks should be returned in full, no projection.
task1 = self.tasks[1]
depstask1 = next(t for t in depsgraph if t['_id'] == unicode(task1['_id']))
self.assertEqual(set(task1.keys()), set(depstask1.keys()))
# The 'Last-Modified' header should contain the last-changed task.
last_modified = parse(resp.headers['Last-Modified'])
with self.app.test_request_context():
task0 = self.flamenco.db('tasks').find_one({'_id': self.task_ids[0]})
self.assert_equal_to_second(task0['_updated'], last_modified)
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# The tasks in the database, as well as the response, should be set to claimed-by-manager
with self.app.test_request_context():
dbtasks = self.flamenco.db('tasks').find({'_id': {'$in': self.task_ids}})
self.assertEqual(8 * [u'claimed-by-manager'], [task['status'] for task in dbtasks])
self.assertEqual(8 * [u'claimed-by-manager'], [task['status'] for task in depsgraph])
def test_get_clean_slate_some_tasks_unrunnable(self):
self.force_task_status(0, 'failed')
self.force_task_status(1, 'canceled')
self.force_task_status(2, 'completed')
resp = self.get('/api/flamenco/managers/%s/depsgraph' % self.mngr_id,
auth_token=self.mngr_token)
depsgraph = resp.json()['depsgraph']
self.assertEqual(len(self.tasks) - 3, len(depsgraph))
deps_tids = {t['_id'] for t in depsgraph}
self.assertEqual({unicode(tid) for tid in self.task_ids[3:]}, deps_tids)
# The previously queued tasks in the database, as well as the response,
# should be set to claimed-by-manager
with self.app.test_request_context():
dbtasks = self.flamenco.db('tasks').find({'_id': {'$in': self.task_ids}})
self.assertEqual([u'failed', u'canceled', u'completed'] + 5 * [u'claimed-by-manager'],
[task['status'] for task in dbtasks])
self.assertEqual(5 * [u'claimed-by-manager'],
[task['status'] for task in depsgraph])
def test_get_subsequent_call(self):
Sybren A. Stüvel
committed
import time
from dateutil.parser import parse
# Get a clean slate first, so that we get the timestamp of last modification
resp = self.get('/api/flamenco/managers/%s/depsgraph' % self.mngr_id,
auth_token=self.mngr_token)
last_modified = resp.headers['Last-Modified']
# Do the subsequent call, it should return nothing.
self.get('/api/flamenco/managers/%s/depsgraph' % self.mngr_id,
auth_token=self.mngr_token,
headers={'If-Modified-Since': last_modified},
expected_status=304)
# Change some tasks to see what we get back.
Sybren A. Stüvel
committed
time.sleep(0.05) # sleep a bit to stabilise the test.
self.force_task_status(0, 'claimed-by-manager')
self.force_task_status(1, 'cancel-requested')
self.force_task_status(2, 'queued')
resp = self.get('/api/flamenco/managers/%s/depsgraph' % self.mngr_id,
auth_token=self.mngr_token,
headers={'If-Modified-Since': last_modified})
depsgraph = resp.json()['depsgraph']
self.assertEqual(2, len(depsgraph)) # we should not get the cancel-requested task back.
deps_tids = {t['_id'] for t in depsgraph}
self.assertEqual({unicode(self.task_ids[0]),
unicode(self.task_ids[2])},
deps_tids)
# The 'Last-Modified' header should contain the last-changed task.
last_modified = parse(resp.headers['Last-Modified'])
with self.app.test_request_context():
task0 = self.flamenco.db('tasks').find_one({'_id': self.task_ids[0]})
task2 = self.flamenco.db('tasks').find_one({'_id': self.task_ids[2]})
# They should be equal to second precision
self.assert_equal_to_second(task2['_updated'], last_modified)
self.assertEqual(task0['status'], u'claimed-by-manager')
self.assertEqual(task2['status'], u'claimed-by-manager')
self.assertEqual(2 * [u'claimed-by-manager'],
[task['status'] for task in depsgraph])
def assert_equal_to_second(self, actual, expected):
import datetime
diff = datetime.timedelta(microseconds=actual.microsecond)
self.assertEqual(actual - diff, expected)