Skip to content
Snippets Groups Projects
Commit 0ba147a7 authored by Sybren A. Stüvel's avatar Sybren A. Stüvel
Browse files

Manager: perform dependency graph based task scheduling

The scheduling is actually performed by MongoDB using the aggregation
pipeline.

This commit also introduces QueueTaskUpdateWithExtra(), to push a task
update to Flamenco Server and set some extra fields in our local database
in one go.

Furthermore, in db.go I've added a type "M" as an alias for bson.M, so
queries look a little bit nicer.
parent 86901546
Branches
Tags
No related merge requests found
...@@ -14,6 +14,8 @@ type countresult struct { ...@@ -14,6 +14,8 @@ type countresult struct {
Count int `bson:"count"` Count int `bson:"count"`
} }
type M bson.M
/** /**
* Returns a MongoDB session. * Returns a MongoDB session.
* *
......
...@@ -33,6 +33,10 @@ type Task struct { ...@@ -33,6 +33,10 @@ type Task struct {
LastWorkerPing *time.Time `bson:"last_worker_ping,omitempty" json:"-"` LastWorkerPing *time.Time `bson:"last_worker_ping,omitempty" json:"-"`
} }
type AggregationPipelineResult struct {
Task *Task `bson:"task"`
}
// Dependency graph response from Server. // Dependency graph response from Server.
type ScheduledTasks struct { type ScheduledTasks struct {
Depsgraph []Task `json:"depsgraph"` Depsgraph []Task `json:"depsgraph"`
......
...@@ -72,17 +72,19 @@ func (ts *TaskScheduler) ScheduleTask(w http.ResponseWriter, r *auth.Authenticat ...@@ -72,17 +72,19 @@ func (ts *TaskScheduler) ScheduleTask(w http.ResponseWriter, r *auth.Authenticat
return return
} }
// Perform variable replacement on the task. // Update the task status to "active", pushing it as a task update to the manager too.
ReplaceVariables(ts.config, task, worker) task.Status = "active"
tupdate := TaskUpdate{TaskId: task.Id, TaskStatus: task.Status}
// update the worker_id field of the task. if err := QueueTaskUpdateWithExtra(&tupdate, db, bson.M{"worker_id": worker.Id}); err != nil {
tasks_coll := db.C("flamenco_tasks") log.Errorf("Unable to queue task update while assigning task %s to worker %s: %s",
if err := tasks_coll.UpdateId(task.Id, bson.M{"$set": bson.M{"worker_id": worker.Id}}); err != nil { task.Id.Hex(), worker.Identifier(), err)
log.Warningf("Unable to set worker_id=%s on task %s: %s", worker.Id.Hex(), task.Id.Hex(), err)
w.WriteHeader(http.StatusInternalServerError) w.WriteHeader(http.StatusInternalServerError)
return return
} }
// Perform variable replacement on the task.
ReplaceVariables(ts.config, task, worker)
// Set it to this worker. // Set it to this worker.
w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Type", "application/json")
encoder := json.NewEncoder(w) encoder := json.NewEncoder(w)
...@@ -107,28 +109,76 @@ func (ts *TaskScheduler) fetchTaskFromQueueOrManager( ...@@ -107,28 +109,76 @@ func (ts *TaskScheduler) fetchTaskFromQueueOrManager(
if len(worker.SupportedJobTypes) == 0 { if len(worker.SupportedJobTypes) == 0 {
log.Warningf("%s: worker %s has no supported job types.", log.Warningf("%s: worker %s has no supported job types.",
r.RemoteAddr, worker.Id.Hex()) r.RemoteAddr, worker.Id.Hex())
w.WriteHeader(http.StatusNotAcceptable)
fmt.Fprintln(w, "You do not support any job types.")
return nil return nil
} }
task := &Task{} result := AggregationPipelineResult{}
tasks_coll := db.C("flamenco_tasks") tasks_coll := db.C("flamenco_tasks")
// TODO Sybren: also include active tasks that are assigned to this worker. var err error
query := bson.M{
"status": bson.M{"$in": []string{"queued", "claimed-by-manager"}},
"job_type": bson.M{"$in": worker.SupportedJobTypes},
}
change := mgo.Change{
Update: bson.M{"$set": bson.M{"status": "active"}},
ReturnNew: true,
}
dtrt := ts.config.DownloadTaskRecheckThrottle
for attempt := 0; attempt < 2; attempt++ { for attempt := 0; attempt < 2; attempt++ {
// TODO: take depsgraph (i.e. parent task status) and task status into account. pipe := tasks_coll.Pipe([]M{
info, err := tasks_coll.Find(query).Sort("-priority").Limit(1).Apply(change, &task) // 1: Select only tasks that have a runnable status & acceptable job type.
M{"$match": M{
"status": M{"$in": []string{"queued", "claimed-by-manager"}},
// "job_type": M{"$in": []string{"sleeping", "testing"}},
}},
// 2: Unwind the parents array, so that we can do a lookup in the next stage.
M{"$unwind": M{
"path": "$parents",
"preserveNullAndEmptyArrays": true,
}},
// 3: Look up the parent document for each unwound task.
// This produces 1-length "parent_doc" arrays.
M{"$lookup": M{
"from": "flamenco_tasks",
"localField": "parents",
"foreignField": "_id",
"as": "parent_doc",
}},
// 4: Unwind again, to turn the 1-length "parent_doc" arrays into a subdocument.
M{"$unwind": M{
"path": "$parent_doc",
"preserveNullAndEmptyArrays": true,
}},
// 5: Group by task ID to undo the unwind, and create an array parent_statuses
// with booleans indicating whether the parent status is "completed".
M{"$group": M{
"_id": "$_id",
"parent_statuses": M{"$push": M{
"$eq": []interface{}{
"completed",
M{"$ifNull": []string{"$parent_doc.status", "completed"}}}}},
// This allows us to keep all dynamic properties of the original task document:
"task": M{"$first": "$$ROOT"},
}},
// 6: Turn the list of "parent_statuses" booleans into a single boolean
M{"$project": M{
"_id": 0,
"parents_completed": M{"$allElementsTrue": []string{"$parent_statuses"}},
"task": 1,
}},
// 7: Select only those tasks for which the parents have completed.
M{"$match": M{
"parents_completed": true,
}},
// 8: just keep the task info, the "parents_runnable" is no longer needed.
M{"$project": M{"task": 1}},
// 9: Sort by priority, with highest prio first. If prio is equal, use newest task.
M{"$sort": bson.D{
{"task.priority", -1},
{"task._id", 1},
}},
// 10: Only return one task.
M{"$limit": 1},
})
err = pipe.One(&result)
if err == mgo.ErrNotFound { if err == mgo.ErrNotFound {
log.Infof("No tasks for worker %s found on attempt %d.", worker.Identifier(), attempt)
dtrt := ts.config.DownloadTaskRecheckThrottle
if attempt == 0 && dtrt >= 0 && time.Now().Sub(last_upstream_check) > dtrt { if attempt == 0 && dtrt >= 0 && time.Now().Sub(last_upstream_check) > dtrt {
// On first attempt: try fetching new tasks from upstream, then re-query the DB. // On first attempt: try fetching new tasks from upstream, then re-query the DB.
log.Infof("%s No more tasks available for %s, checking upstream", log.Infof("%s No more tasks available for %s, checking upstream",
...@@ -142,7 +192,7 @@ func (ts *TaskScheduler) fetchTaskFromQueueOrManager( ...@@ -142,7 +192,7 @@ func (ts *TaskScheduler) fetchTaskFromQueueOrManager(
w.WriteHeader(204) w.WriteHeader(204)
return nil return nil
} else if err != nil { } else if err != nil {
log.Warningf("%s Error fetching task for %s: %s // %s", r.RemoteAddr, r.Username, err, info) log.Errorf("%s Error fetching task for %s: %s", r.RemoteAddr, r.Username, err)
w.WriteHeader(500) w.WriteHeader(500)
return nil return nil
} }
...@@ -150,5 +200,5 @@ func (ts *TaskScheduler) fetchTaskFromQueueOrManager( ...@@ -150,5 +200,5 @@ func (ts *TaskScheduler) fetchTaskFromQueueOrManager(
break break
} }
return task return result.Task
} }
...@@ -13,6 +13,7 @@ import ( ...@@ -13,6 +13,7 @@ import (
check "gopkg.in/check.v1" check "gopkg.in/check.v1"
"gopkg.in/jarcoal/httpmock.v1" "gopkg.in/jarcoal/httpmock.v1"
mgo "gopkg.in/mgo.v2" mgo "gopkg.in/mgo.v2"
"gopkg.in/mgo.v2/bson"
) )
type SchedulerTestSuite struct { type SchedulerTestSuite struct {
...@@ -51,6 +52,7 @@ func (s *SchedulerTestSuite) SetUpTest(c *check.C) { ...@@ -51,6 +52,7 @@ func (s *SchedulerTestSuite) SetUpTest(c *check.C) {
s.worker_lnx = Worker{ s.worker_lnx = Worker{
Platform: "linux", Platform: "linux",
SupportedJobTypes: []string{"sleeping"}, SupportedJobTypes: []string{"sleeping"},
Nickname: "worker_lnx",
} }
if err := StoreNewWorker(&s.worker_lnx, s.db); err != nil { if err := StoreNewWorker(&s.worker_lnx, s.db); err != nil {
c.Fatal("Unable to insert test worker_lnx", err) c.Fatal("Unable to insert test worker_lnx", err)
...@@ -58,6 +60,7 @@ func (s *SchedulerTestSuite) SetUpTest(c *check.C) { ...@@ -58,6 +60,7 @@ func (s *SchedulerTestSuite) SetUpTest(c *check.C) {
s.worker_win = Worker{ s.worker_win = Worker{
Platform: "windows", Platform: "windows",
SupportedJobTypes: []string{"testing"}, SupportedJobTypes: []string{"testing"},
Nickname: "worker_win",
} }
if err := StoreNewWorker(&s.worker_win, s.db); err != nil { if err := StoreNewWorker(&s.worker_win, s.db); err != nil {
c.Fatal("Unable to insert test worker_win", err) c.Fatal("Unable to insert test worker_win", err)
...@@ -307,3 +310,86 @@ func (s *SchedulerTestSuite) TestSchedulerVerifyUpstreamDeleted(t *check.C) { ...@@ -307,3 +310,86 @@ func (s *SchedulerTestSuite) TestSchedulerVerifyUpstreamDeleted(t *check.C) {
assert.Equal(t, "active", found_task.Status) assert.Equal(t, "active", found_task.Status)
assert.Equal(t, 50, found_task.Priority) assert.Equal(t, 50, found_task.Priority)
} }
func (s *SchedulerTestSuite) TestParentTaskNotCompleted(c *check.C) {
tasks_coll := s.db.C("flamenco_tasks")
// Task 1 is being worked on by worker_win
task1 := ConstructTestTaskWithPrio("1aaaaaaaaaaaaaaaaaaaaaaa", "sleeping", 50)
task1.Status = "active"
task1.WorkerId = &s.worker_win.Id
assert.Nil(c, tasks_coll.Insert(task1))
// Task 2 is unavailable due to its parent not being completed.
task2 := ConstructTestTaskWithPrio("2aaaaaaaaaaaaaaaaaaaaaaa", "sleeping", 100)
task2.Parents = []bson.ObjectId{task1.Id}
task2.Status = "claimed-by-manager"
assert.Nil(c, tasks_coll.Insert(task2))
// Fetch a task from the queue
resp_rec, ar := WorkerTestRequest(s.worker_lnx.Id, "TEST", "/whatevah")
task := s.sched.fetchTaskFromQueueOrManager(resp_rec, ar, s.db, &s.worker_lnx)
// We should not get any task back, since task1 is already taken, and task2
// has a non-completed parent.
assert.Nil(c, task, "Expected nil, got task %v instead", task)
assert.Equal(c, http.StatusNoContent, resp_rec.Code)
}
func (s *SchedulerTestSuite) TestParentTaskCompleted(c *check.C) {
tasks_coll := s.db.C("flamenco_tasks")
// Task 1 has been completed by worker_win
task1 := ConstructTestTaskWithPrio("1aaaaaaaaaaaaaaaaaaaaaaa", "sleeping", 50)
task1.Status = "completed"
task1.WorkerId = &s.worker_win.Id
assert.Nil(c, tasks_coll.Insert(task1))
// Task 2 is available due to its parent being completed.
task2 := ConstructTestTaskWithPrio("2aaaaaaaaaaaaaaaaaaaaaaa", "sleeping", 100)
task2.Parents = []bson.ObjectId{task1.Id}
task2.Status = "claimed-by-manager"
assert.Nil(c, tasks_coll.Insert(task2))
// Fetch a task from the queue
resp_rec, ar := WorkerTestRequest(s.worker_lnx.Id, "TEST", "/whatevah")
task := s.sched.fetchTaskFromQueueOrManager(resp_rec, ar, s.db, &s.worker_lnx)
assert.Equal(c, http.StatusOK, resp_rec.Code)
// We should get task 2.
assert.NotNil(c, task, "Expected task %s, got nil instead", task2.Id.Hex())
if task != nil { // prevent nil pointer dereference
assert.Equal(c, task.Id, task2.Id, "Expected task %s, got task %s instead",
task2.Id.Hex(), task.Id.Hex())
}
}
func (s *SchedulerTestSuite) TestParentTaskOneCompletedOneNot(c *check.C) {
tasks_coll := s.db.C("flamenco_tasks")
// Task 1 is being worked on by worker_win
task1 := ConstructTestTaskWithPrio("1aaaaaaaaaaaaaaaaaaaaaaa", "sleeping", 50)
task1.Status = "active"
task1.WorkerId = &s.worker_win.Id
assert.Nil(c, tasks_coll.Insert(task1))
// Task 2 is already completed.
task2 := ConstructTestTaskWithPrio("2aaaaaaaaaaaaaaaaaaaaaaa", "sleeping", 50)
task2.Status = "completed"
task2.WorkerId = &s.worker_win.Id
assert.Nil(c, tasks_coll.Insert(task2))
// Task 3 is unavailable due to one of its parent not being completed.
task3 := ConstructTestTaskWithPrio("3aaaaaaaaaaaaaaaaaaaaaaa", "sleeping", 100)
task3.Parents = []bson.ObjectId{task1.Id, task2.Id}
task3.Status = "claimed-by-manager"
assert.Nil(c, tasks_coll.Insert(task3))
// Fetch a task from the queue
resp_rec, ar := WorkerTestRequest(s.worker_lnx.Id, "TEST", "/whatevah")
task := s.sched.fetchTaskFromQueueOrManager(resp_rec, ar, s.db, &s.worker_lnx)
// We should not get any task back.
assert.Nil(c, task, "Expected nil, got task %v instead", task)
assert.Equal(c, http.StatusNoContent, resp_rec.Code)
}
...@@ -91,6 +91,13 @@ func QueueTaskUpdateFromWorker(w http.ResponseWriter, r *auth.AuthenticatedReque ...@@ -91,6 +91,13 @@ func QueueTaskUpdateFromWorker(w http.ResponseWriter, r *auth.AuthenticatedReque
} }
func QueueTaskUpdate(tupdate *TaskUpdate, db *mgo.Database) error { func QueueTaskUpdate(tupdate *TaskUpdate, db *mgo.Database) error {
return QueueTaskUpdateWithExtra(tupdate, db, bson.M{})
}
/* Same as QueueTaskUpdate(), but with extra updates to be performed on the local flamenco_tasks
* collection.
*/
func QueueTaskUpdateWithExtra(tupdate *TaskUpdate, db *mgo.Database, extra_updates bson.M) error {
// For ensuring the ordering of updates. time.Time has nanosecond precision. // For ensuring the ordering of updates. time.Time has nanosecond precision.
tupdate.ReceivedOnManager = time.Now().UTC() tupdate.ReceivedOnManager = time.Now().UTC()
tupdate.Id = bson.NewObjectId() tupdate.Id = bson.NewObjectId()
...@@ -105,7 +112,7 @@ func QueueTaskUpdate(tupdate *TaskUpdate, db *mgo.Database) error { ...@@ -105,7 +112,7 @@ func QueueTaskUpdate(tupdate *TaskUpdate, db *mgo.Database) error {
// This prevents a task being reported active on the worker from overwriting the // This prevents a task being reported active on the worker from overwriting the
// cancel-requested state we received from the Server. // cancel-requested state we received from the Server.
task_coll := db.C("flamenco_tasks") task_coll := db.C("flamenco_tasks")
updates := bson.M{} updates := extra_updates
if tupdate.TaskStatus != "" { if tupdate.TaskStatus != "" {
// Before blindly applying the task status, first check if the transition is valid. // Before blindly applying the task status, first check if the transition is valid.
if TaskStatusTransitionValid(task_coll, tupdate.TaskId, tupdate.TaskStatus) { if TaskStatusTransitionValid(task_coll, tupdate.TaskId, tupdate.TaskStatus) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment