Skip to content
Snippets Groups Projects
Commit 617a08fa authored by Sybren A. Stüvel's avatar Sybren A. Stüvel
Browse files

Manager: scheduler no longer synchronously kicks the task downloader

If there is no task available for a worker, the worker now gets a quick
response that there is no task available. Simultaneously, the downloader
is kicked to fetch a new set of tasks from upstream Server.

Workers ask quite frequently for new tasks (every 5 seconds), so it is
likely that the downloader is already finished when a new request from this
worker comes in.
parent b0282136
Branches
No related tags found
No related merge requests found
...@@ -13,14 +13,14 @@ import ( ...@@ -13,14 +13,14 @@ import (
"gopkg.in/mgo.v2/bson" "gopkg.in/mgo.v2/bson"
) )
/* Timestamp of the last time we kicked the task downloader because there weren't any
* tasks left for workers. */
var last_upstream_check time.Time
type TaskScheduler struct { type TaskScheduler struct {
config *Conf config *Conf
upstream *UpstreamConnection upstream *UpstreamConnection
session *mgo.Session session *mgo.Session
/* Timestamp of the last time we kicked the task downloader because there weren't any
* tasks left for workers. */
lastUpstreamCheck time.Time
} }
func CreateTaskScheduler(config *Conf, upstream *UpstreamConnection, session *mgo.Session) *TaskScheduler { func CreateTaskScheduler(config *Conf, upstream *UpstreamConnection, session *mgo.Session) *TaskScheduler {
...@@ -28,6 +28,7 @@ func CreateTaskScheduler(config *Conf, upstream *UpstreamConnection, session *mg ...@@ -28,6 +28,7 @@ func CreateTaskScheduler(config *Conf, upstream *UpstreamConnection, session *mg
config, config,
upstream, upstream,
session, session,
time.Time{},
} }
} }
...@@ -117,88 +118,85 @@ func (ts *TaskScheduler) fetchTaskFromQueueOrManager( ...@@ -117,88 +118,85 @@ func (ts *TaskScheduler) fetchTaskFromQueueOrManager(
result := AggregationPipelineResult{} result := AggregationPipelineResult{}
tasks_coll := db.C("flamenco_tasks") tasks_coll := db.C("flamenco_tasks")
var err error pipe := tasks_coll.Pipe([]M{
for attempt := 0; attempt < 2; attempt++ { // 1: Select only tasks that have a runnable status & acceptable job type.
pipe := tasks_coll.Pipe([]M{ M{"$match": M{
// 1: Select only tasks that have a runnable status & acceptable job type. "status": M{"$in": []string{"queued", "claimed-by-manager"}},
M{"$match": M{ // "job_type": M{"$in": []string{"sleeping", "testing"}},
"status": M{"$in": []string{"queued", "claimed-by-manager"}}, }},
// "job_type": M{"$in": []string{"sleeping", "testing"}}, // 2: Unwind the parents array, so that we can do a lookup in the next stage.
}}, M{"$unwind": M{
// 2: Unwind the parents array, so that we can do a lookup in the next stage. "path": "$parents",
M{"$unwind": M{ "preserveNullAndEmptyArrays": true,
"path": "$parents", }},
"preserveNullAndEmptyArrays": true, // 3: Look up the parent document for each unwound task.
}}, // This produces 1-length "parent_doc" arrays.
// 3: Look up the parent document for each unwound task. M{"$lookup": M{
// This produces 1-length "parent_doc" arrays. "from": "flamenco_tasks",
M{"$lookup": M{ "localField": "parents",
"from": "flamenco_tasks", "foreignField": "_id",
"localField": "parents", "as": "parent_doc",
"foreignField": "_id", }},
"as": "parent_doc", // 4: Unwind again, to turn the 1-length "parent_doc" arrays into a subdocument.
}}, M{"$unwind": M{
// 4: Unwind again, to turn the 1-length "parent_doc" arrays into a subdocument. "path": "$parent_doc",
M{"$unwind": M{ "preserveNullAndEmptyArrays": true,
"path": "$parent_doc", }},
"preserveNullAndEmptyArrays": true, // 5: Group by task ID to undo the unwind, and create an array parent_statuses
}}, // with booleans indicating whether the parent status is "completed".
// 5: Group by task ID to undo the unwind, and create an array parent_statuses M{"$group": M{
// with booleans indicating whether the parent status is "completed". "_id": "$_id",
M{"$group": M{ "parent_statuses": M{"$push": M{
"_id": "$_id", "$eq": []interface{}{
"parent_statuses": M{"$push": M{ "completed",
"$eq": []interface{}{ M{"$ifNull": []string{"$parent_doc.status", "completed"}}}}},
"completed", // This allows us to keep all dynamic properties of the original task document:
M{"$ifNull": []string{"$parent_doc.status", "completed"}}}}}, "task": M{"$first": "$$ROOT"},
// This allows us to keep all dynamic properties of the original task document: }},
"task": M{"$first": "$$ROOT"}, // 6: Turn the list of "parent_statuses" booleans into a single boolean
}}, M{"$project": M{
// 6: Turn the list of "parent_statuses" booleans into a single boolean "_id": 0,
M{"$project": M{ "parents_completed": M{"$allElementsTrue": []string{"$parent_statuses"}},
"_id": 0, "task": 1,
"parents_completed": M{"$allElementsTrue": []string{"$parent_statuses"}}, }},
"task": 1, // 7: Select only those tasks for which the parents have completed.
}}, M{"$match": M{
// 7: Select only those tasks for which the parents have completed. "parents_completed": true,
M{"$match": M{ }},
"parents_completed": true, // 8: just keep the task info, the "parents_runnable" is no longer needed.
}}, M{"$project": M{"task": 1}},
// 8: just keep the task info, the "parents_runnable" is no longer needed. // 9: Sort by priority, with highest prio first. If prio is equal, use newest task.
M{"$project": M{"task": 1}}, M{"$sort": bson.D{
// 9: Sort by priority, with highest prio first. If prio is equal, use newest task. {"task.priority", -1},
M{"$sort": bson.D{ {"task._id", 1},
{"task.priority", -1}, }},
{"task._id", 1}, // 10: Only return one task.
}}, M{"$limit": 1},
// 10: Only return one task. })
M{"$limit": 1},
}) err := pipe.One(&result)
if err == mgo.ErrNotFound {
err = pipe.One(&result) log.Infof("%s no more tasks available for %s", r.RemoteAddr, worker.Identifier())
if err == mgo.ErrNotFound { ts.maybeKickTaskDownloader()
log.Infof("No tasks for worker %s found on attempt %d.", worker.Identifier(), attempt) w.WriteHeader(204)
dtrt := ts.config.DownloadTaskRecheckThrottle return nil
if attempt == 0 && dtrt >= 0 && time.Now().Sub(last_upstream_check) > dtrt { }
// On first attempt: try fetching new tasks from upstream, then re-query the DB. if err != nil {
log.Infof("%s No more tasks available for %s, checking upstream", log.Errorf("%s Error fetching task for %s: %s", r.RemoteAddr, r.Username, err)
r.RemoteAddr, r.Username) w.WriteHeader(500)
last_upstream_check = time.Now() return nil
ts.upstream.KickDownloader(true)
continue
}
log.Infof("%s Really no more tasks available for %s", r.RemoteAddr, r.Username)
w.WriteHeader(204)
return nil
} else if err != nil {
log.Errorf("%s Error fetching task for %s: %s", r.RemoteAddr, r.Username, err)
w.WriteHeader(500)
return nil
}
break
} }
return result.Task return result.Task
} }
func (ts *TaskScheduler) maybeKickTaskDownloader() {
dtrt := ts.config.DownloadTaskRecheckThrottle
if dtrt < 0 || time.Now().Sub(ts.lastUpstreamCheck) <= dtrt {
return
}
log.Infof("TaskScheduler: kicking task downloader")
ts.lastUpstreamCheck = time.Now()
ts.upstream.KickDownloader(false)
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment