Skip to content
Snippets Groups Projects
Commit 3a32141f authored by Sybren A. Stüvel's avatar Sybren A. Stüvel
Browse files

Server: accepting task status/activity/log updates.

parent ccdfc573
Branches
No related tags found
No related merge requests found
...@@ -88,6 +88,11 @@ class FlamencoExtension(PillarExtension): ...@@ -88,6 +88,11 @@ class FlamencoExtension(PillarExtension):
submodules. submodules.
""" """
# Create the flamenco_task_logs collection with a compressing storage engine.
# If the zlib compression is too CPU-intensive, switch to Snappy instead.
with app.app_context():
self._create_collections(app.db())
from . import managers, jobs, tasks from . import managers, jobs, tasks
managers.setup_app(app) managers.setup_app(app)
...@@ -97,6 +102,27 @@ class FlamencoExtension(PillarExtension): ...@@ -97,6 +102,27 @@ class FlamencoExtension(PillarExtension):
# Imports for side-effects # Imports for side-effects
from . import scheduler from . import scheduler
def _create_collections(self, db):
import pymongo
if 'flamenco_task_logs' not in db.collection_names(include_system_collections=False):
self._log.info('Creating flamenco_task_logs collection.')
db.create_collection('flamenco_task_logs',
storageEngine={
'wiredTiger': {'configString': 'block_compressor=zlib'}
})
else:
self._log.info('Not creating flamenco_task_logs collection, already exists.')
self._log.info('Creating index on flamenco_task_logs collection')
db.flamenco_task_logs.create_index(
[('task_id', pymongo.ASCENDING),
('received_on_manager', pymongo.ASCENDING)],
background=True,
unique=False,
sparse=False,
)
def flamenco_projects(self): def flamenco_projects(self):
"""Returns projects set up for Flamenco. """Returns projects set up for Flamenco.
......
...@@ -268,6 +268,34 @@ tasks_schema = { ...@@ -268,6 +268,34 @@ tasks_schema = {
'worker': { 'worker': {
'type': 'string', 'type': 'string',
}, },
'task_progress_percentage': {
'type': 'integer',
},
'current_command_index': {
'type': 'integer',
},
'command_progress_percentage': {
'type': 'integer',
},
}
task_logs_schema = {
'task': {
'type': 'objectid',
'data_relation': {
'resource': 'flamenco_tasks',
'field': '_id',
},
'required': True,
},
'received_on_manager': {
'type': 'datetime',
'required': True,
},
'log': {
'type': 'string',
'required': True,
},
} }
_managers = { _managers = {
...@@ -291,8 +319,16 @@ _tasks = { ...@@ -291,8 +319,16 @@ _tasks = {
'public_item_methods': [], 'public_item_methods': [],
} }
_task_logs = {
'schema': task_logs_schema,
'item_methods': ['GET', 'DELETE'],
'public_methods': [],
'public_item_methods': [],
}
DOMAIN = { DOMAIN = {
'flamenco_managers': _managers, 'flamenco_managers': _managers,
'flamenco_jobs': _jobs, 'flamenco_jobs': _jobs,
'flamenco_tasks': _tasks 'flamenco_tasks': _tasks,
'flamenco_task_logs': _task_logs,
} }
import logging import logging
from flask import Blueprint, request from flask import Blueprint, request
import flask_login
import werkzeug.exceptions as wz_exceptions import werkzeug.exceptions as wz_exceptions
from pillar.api.utils import authorization, authentication from pillar.api.utils import authorization, authentication
...@@ -10,9 +9,13 @@ api_blueprint = Blueprint('flamenco.managers', __name__) ...@@ -10,9 +9,13 @@ api_blueprint = Blueprint('flamenco.managers', __name__)
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@api_blueprint.route('/<manager_id>/startup', methods=['POST']) def manager_api_call(wrapped):
"""Decorator, performs some standard stuff for Manager API endpoints."""
import functools
@authorization.require_login(require_roles={u'service', u'flamenco_manager'}, require_all=True) @authorization.require_login(require_roles={u'service', u'flamenco_manager'}, require_all=True)
def startup(manager_id): @functools.wraps(wrapped)
def wrapper(manager_id, *args, **kwargs):
from flamenco import current_flamenco from flamenco import current_flamenco
from pillar.api.utils import str2id, mongo from pillar.api.utils import str2id, mongo
...@@ -24,10 +27,17 @@ def startup(manager_id): ...@@ -24,10 +27,17 @@ def startup(manager_id):
'service account', user_id, manager_id) 'service account', user_id, manager_id)
raise wz_exceptions.Unauthorized() raise wz_exceptions.Unauthorized()
notification = request.json return wrapped(manager_id, request.json, *args, **kwargs)
return wrapper
log.info('Received startup notification from manager %s', manager_id)
log.info('Contents:\n%s\n', notification) @api_blueprint.route('/<manager_id>/startup', methods=['POST'])
@manager_api_call
def startup(manager_id, notification):
from flamenco import current_flamenco
log.info('Received startup notification from manager %s %s', manager_id, notification)
mngr_coll = current_flamenco.db('managers') mngr_coll = current_flamenco.db('managers')
update_res = mngr_coll.update_one( update_res = mngr_coll.update_one(
...@@ -46,5 +56,86 @@ def startup(manager_id): ...@@ -46,5 +56,86 @@ def startup(manager_id):
return '', 204 return '', 204
@api_blueprint.route('/<manager_id>/task-update-batch', methods=['POST'])
@manager_api_call
def task_update_batch(manager_id, task_updates):
import dateutil.parser
from pillar.api.utils import jsonify, str2id
from flamenco import current_flamenco, eve_settings
if not isinstance(task_updates, list):
raise wz_exceptions.BadRequest('Expected list of task updates.')
log.info('Received %i task updates from manager %s', len(task_updates), manager_id)
tasks_coll = current_flamenco.db('tasks')
logs_coll = current_flamenco.db('task_logs')
valid_statuses = set(eve_settings.tasks_schema['status']['allowed'])
handled_update_ids = []
total_modif_count = 0
for task_update in task_updates:
# Check that this task actually belongs to this manager, before we accept any updates.
update_id = str2id(task_update['_id'])
task_id = str2id(task_update['task_id'])
tmaninfo = tasks_coll.find_one({'_id': task_id}, projection={'manager': 1})
# For now, we just ignore updates to non-existing tasks. Someone might have just deleted
# one, for example. This is not a reason to reject the entire batch.
if tmaninfo is None:
log.warning('Manager %s sent update for non-existing task %s; ignoring',
manager_id, task_id)
continue
if tmaninfo['manager'] != manager_id:
log.warning('Manager %s sent update for task %s which belongs to other manager %s',
manager_id, task_id, tmaninfo['manager'])
continue
# Store the log for this task, allowing for duplicate log reports.
task_log = task_update.get('log')
if task_log:
received_on_manager = dateutil.parser.parse(task_update['received_on_manager'])
log_doc = {
'_id': update_id,
'task': task_id,
'received_on_manager': received_on_manager,
'log': task_log
}
logs_coll.replace_one({'_id': update_id}, log_doc, upsert=True)
# Modify the task, and append the log to the logs collection.
updates = {
'task_progress_percentage': task_update.get('task_progress_percentage', 0),
'current_command_index': task_update.get('current_command_index', 0),
'command_progress_percentage': task_update.get('command_progress_percentage', 0),
}
new_status = task_update.get('task_status')
if new_status:
updates['status'] = new_status
if new_status not in valid_statuses:
# We have to accept the invalid status, because we're too late in the update
# pipeline to do anything about it. The alternative is to drop the update or
# reject the entire batch of updates, which is more damaging to the workflow.
log.warning('Manager %s sent update for task %s with invalid status %r '
'(storing anyway)', manager_id, task_id, new_status)
new_activity = task_update.get('activity')
if new_activity:
updates['activity'] = new_activity
worker = task_update.get('worker')
if worker:
updates['worker'] = worker
result = tasks_coll.update_one({'_id': task_id}, {'$set': updates})
total_modif_count += result.modified_count
handled_update_ids.append(update_id)
return jsonify({'modified_count': total_modif_count,
'handled_update_ids': handled_update_ids})
def setup_app(app): def setup_app(app):
app.register_api_blueprint(api_blueprint, url_prefix='/flamenco/managers') app.register_api_blueprint(api_blueprint, url_prefix='/flamenco/managers')
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment