From 0ba147a7c2a49b624fe80b4a74cf90a5c0679836 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Sybren=20A=2E=20St=C3=BCvel?= <sybren@stuvel.eu>
Date: Thu, 26 Jan 2017 15:01:52 +0100
Subject: [PATCH] Manager: perform dependency graph based task scheduling

The scheduling is actually performed by MongoDB using the aggregation
pipeline.

This commit also introduces QueueTaskUpdateWithExtra(), to push a task
update to Flamenco Server and set some extra fields in our local database
in one go.

Furthermore, in db.go I've added a type "M" as an alias for bson.M, so
queries look a little bit nicer.
---
 .../src/flamenco-manager/flamenco/db.go       |  2 +
 .../flamenco-manager/flamenco/documents.go    |  4 +
 .../flamenco-manager/flamenco/scheduler.go    | 98 ++++++++++++++-----
 .../flamenco/scheduler_test.go                | 86 ++++++++++++++++
 .../flamenco-manager/flamenco/task_updates.go |  9 +-
 5 files changed, 174 insertions(+), 25 deletions(-)

diff --git a/packages/flamenco-manager-go/src/flamenco-manager/flamenco/db.go b/packages/flamenco-manager-go/src/flamenco-manager/flamenco/db.go
index 6dcea7b4..7a41c176 100644
--- a/packages/flamenco-manager-go/src/flamenco-manager/flamenco/db.go
+++ b/packages/flamenco-manager-go/src/flamenco-manager/flamenco/db.go
@@ -14,6 +14,8 @@ type countresult struct {
 	Count int `bson:"count"`
 }
 
+type M bson.M
+
 /**
  * Returns a MongoDB session.
  *
diff --git a/packages/flamenco-manager-go/src/flamenco-manager/flamenco/documents.go b/packages/flamenco-manager-go/src/flamenco-manager/flamenco/documents.go
index a700b349..ca2e1bbd 100644
--- a/packages/flamenco-manager-go/src/flamenco-manager/flamenco/documents.go
+++ b/packages/flamenco-manager-go/src/flamenco-manager/flamenco/documents.go
@@ -33,6 +33,10 @@ type Task struct {
 	LastWorkerPing *time.Time     `bson:"last_worker_ping,omitempty" json:"-"`
 }
 
+type AggregationPipelineResult struct {
+	Task *Task `bson:"task"`
+}
+
 // Dependency graph response from Server.
 type ScheduledTasks struct {
 	Depsgraph []Task `json:"depsgraph"`
diff --git a/packages/flamenco-manager-go/src/flamenco-manager/flamenco/scheduler.go b/packages/flamenco-manager-go/src/flamenco-manager/flamenco/scheduler.go
index be0cbb65..8282fffc 100644
--- a/packages/flamenco-manager-go/src/flamenco-manager/flamenco/scheduler.go
+++ b/packages/flamenco-manager-go/src/flamenco-manager/flamenco/scheduler.go
@@ -72,17 +72,19 @@ func (ts *TaskScheduler) ScheduleTask(w http.ResponseWriter, r *auth.Authenticat
 		return
 	}
 
-	// Perform variable replacement on the task.
-	ReplaceVariables(ts.config, task, worker)
-
-	// update the worker_id field of the task.
-	tasks_coll := db.C("flamenco_tasks")
-	if err := tasks_coll.UpdateId(task.Id, bson.M{"$set": bson.M{"worker_id": worker.Id}}); err != nil {
-		log.Warningf("Unable to set worker_id=%s on task %s: %s", worker.Id.Hex(), task.Id.Hex(), err)
+	// Update the task status to "active", pushing it as a task update to the manager too.
+	task.Status = "active"
+	tupdate := TaskUpdate{TaskId: task.Id, TaskStatus: task.Status}
+	if err := QueueTaskUpdateWithExtra(&tupdate, db, bson.M{"worker_id": worker.Id}); err != nil {
+		log.Errorf("Unable to queue task update while assigning task %s to worker %s: %s",
+			task.Id.Hex(), worker.Identifier(), err)
 		w.WriteHeader(http.StatusInternalServerError)
 		return
 	}
 
+	// Perform variable replacement on the task.
+	ReplaceVariables(ts.config, task, worker)
+
 	// Set it to this worker.
 	w.Header().Set("Content-Type", "application/json")
 	encoder := json.NewEncoder(w)
@@ -107,28 +109,76 @@ func (ts *TaskScheduler) fetchTaskFromQueueOrManager(
 	if len(worker.SupportedJobTypes) == 0 {
 		log.Warningf("%s: worker %s has no supported job types.",
 			r.RemoteAddr, worker.Id.Hex())
+		w.WriteHeader(http.StatusNotAcceptable)
+		fmt.Fprintln(w, "You do not support any job types.")
 		return nil
 	}
 
-	task := &Task{}
+	result := AggregationPipelineResult{}
 	tasks_coll := db.C("flamenco_tasks")
 
-	// TODO Sybren: also include active tasks that are assigned to this worker.
-	query := bson.M{
-		"status":   bson.M{"$in": []string{"queued", "claimed-by-manager"}},
-		"job_type": bson.M{"$in": worker.SupportedJobTypes},
-	}
-	change := mgo.Change{
-		Update:    bson.M{"$set": bson.M{"status": "active"}},
-		ReturnNew: true,
-	}
-
-	dtrt := ts.config.DownloadTaskRecheckThrottle
-
+	var err error
 	for attempt := 0; attempt < 2; attempt++ {
-		// TODO: take depsgraph (i.e. parent task status) and task status into account.
-		info, err := tasks_coll.Find(query).Sort("-priority").Limit(1).Apply(change, &task)
+		pipe := tasks_coll.Pipe([]M{
+			// 1: Select only tasks that have a runnable status & acceptable job type.
+			M{"$match": M{
+				"status": M{"$in": []string{"queued", "claimed-by-manager"}},
+				// "job_type": M{"$in": []string{"sleeping", "testing"}},
+			}},
+			// 2: Unwind the parents array, so that we can do a lookup in the next stage.
+			M{"$unwind": M{
+				"path": "$parents",
+				"preserveNullAndEmptyArrays": true,
+			}},
+			// 3: Look up the parent document for each unwound task.
+			// This produces 1-length "parent_doc" arrays.
+			M{"$lookup": M{
+				"from":         "flamenco_tasks",
+				"localField":   "parents",
+				"foreignField": "_id",
+				"as":           "parent_doc",
+			}},
+			// 4: Unwind again, to turn the 1-length "parent_doc" arrays into a subdocument.
+			M{"$unwind": M{
+				"path": "$parent_doc",
+				"preserveNullAndEmptyArrays": true,
+			}},
+			// 5: Group by task ID to undo the unwind, and create an array parent_statuses
+			// with booleans indicating whether the parent status is "completed".
+			M{"$group": M{
+				"_id": "$_id",
+				"parent_statuses": M{"$push": M{
+					"$eq": []interface{}{
+						"completed",
+						M{"$ifNull": []string{"$parent_doc.status", "completed"}}}}},
+				// This allows us to keep all dynamic properties of the original task document:
+				"task": M{"$first": "$$ROOT"},
+			}},
+			// 6: Turn the list of "parent_statuses" booleans into a single boolean
+			M{"$project": M{
+				"_id":               0,
+				"parents_completed": M{"$allElementsTrue": []string{"$parent_statuses"}},
+				"task":              1,
+			}},
+			// 7: Select only those tasks for which the parents have completed.
+			M{"$match": M{
+				"parents_completed": true,
+			}},
+			// 8: just keep the task info, the "parents_runnable" is no longer needed.
+			M{"$project": M{"task": 1}},
+			// 9: Sort by priority, with highest prio first. If prio is equal, use newest task.
+			M{"$sort": bson.D{
+				{"task.priority", -1},
+				{"task._id", 1},
+			}},
+			// 10: Only return one task.
+			M{"$limit": 1},
+		})
+
+		err = pipe.One(&result)
 		if err == mgo.ErrNotFound {
+			log.Infof("No tasks for worker %s found on attempt %d.", worker.Identifier(), attempt)
+			dtrt := ts.config.DownloadTaskRecheckThrottle
 			if attempt == 0 && dtrt >= 0 && time.Now().Sub(last_upstream_check) > dtrt {
 				// On first attempt: try fetching new tasks from upstream, then re-query the DB.
 				log.Infof("%s No more tasks available for %s, checking upstream",
@@ -142,7 +192,7 @@ func (ts *TaskScheduler) fetchTaskFromQueueOrManager(
 			w.WriteHeader(204)
 			return nil
 		} else if err != nil {
-			log.Warningf("%s Error fetching task for %s: %s // %s", r.RemoteAddr, r.Username, err, info)
+			log.Errorf("%s Error fetching task for %s: %s", r.RemoteAddr, r.Username, err)
 			w.WriteHeader(500)
 			return nil
 		}
@@ -150,5 +200,5 @@ func (ts *TaskScheduler) fetchTaskFromQueueOrManager(
 		break
 	}
 
-	return task
+	return result.Task
 }
diff --git a/packages/flamenco-manager-go/src/flamenco-manager/flamenco/scheduler_test.go b/packages/flamenco-manager-go/src/flamenco-manager/flamenco/scheduler_test.go
index 83d819ec..0552c0e8 100644
--- a/packages/flamenco-manager-go/src/flamenco-manager/flamenco/scheduler_test.go
+++ b/packages/flamenco-manager-go/src/flamenco-manager/flamenco/scheduler_test.go
@@ -13,6 +13,7 @@ import (
 	check "gopkg.in/check.v1"
 	"gopkg.in/jarcoal/httpmock.v1"
 	mgo "gopkg.in/mgo.v2"
+	"gopkg.in/mgo.v2/bson"
 )
 
 type SchedulerTestSuite struct {
@@ -51,6 +52,7 @@ func (s *SchedulerTestSuite) SetUpTest(c *check.C) {
 	s.worker_lnx = Worker{
 		Platform:          "linux",
 		SupportedJobTypes: []string{"sleeping"},
+		Nickname:          "worker_lnx",
 	}
 	if err := StoreNewWorker(&s.worker_lnx, s.db); err != nil {
 		c.Fatal("Unable to insert test worker_lnx", err)
@@ -58,6 +60,7 @@ func (s *SchedulerTestSuite) SetUpTest(c *check.C) {
 	s.worker_win = Worker{
 		Platform:          "windows",
 		SupportedJobTypes: []string{"testing"},
+		Nickname:          "worker_win",
 	}
 	if err := StoreNewWorker(&s.worker_win, s.db); err != nil {
 		c.Fatal("Unable to insert test worker_win", err)
@@ -307,3 +310,86 @@ func (s *SchedulerTestSuite) TestSchedulerVerifyUpstreamDeleted(t *check.C) {
 	assert.Equal(t, "active", found_task.Status)
 	assert.Equal(t, 50, found_task.Priority)
 }
+
+func (s *SchedulerTestSuite) TestParentTaskNotCompleted(c *check.C) {
+	tasks_coll := s.db.C("flamenco_tasks")
+
+	// Task 1 is being worked on by worker_win
+	task1 := ConstructTestTaskWithPrio("1aaaaaaaaaaaaaaaaaaaaaaa", "sleeping", 50)
+	task1.Status = "active"
+	task1.WorkerId = &s.worker_win.Id
+	assert.Nil(c, tasks_coll.Insert(task1))
+
+	// Task 2 is unavailable due to its parent not being completed.
+	task2 := ConstructTestTaskWithPrio("2aaaaaaaaaaaaaaaaaaaaaaa", "sleeping", 100)
+	task2.Parents = []bson.ObjectId{task1.Id}
+	task2.Status = "claimed-by-manager"
+	assert.Nil(c, tasks_coll.Insert(task2))
+
+	// Fetch a task from the queue
+	resp_rec, ar := WorkerTestRequest(s.worker_lnx.Id, "TEST", "/whatevah")
+	task := s.sched.fetchTaskFromQueueOrManager(resp_rec, ar, s.db, &s.worker_lnx)
+
+	// We should not get any task back, since task1 is already taken, and task2
+	// has a non-completed parent.
+	assert.Nil(c, task, "Expected nil, got task %v instead", task)
+	assert.Equal(c, http.StatusNoContent, resp_rec.Code)
+}
+
+func (s *SchedulerTestSuite) TestParentTaskCompleted(c *check.C) {
+	tasks_coll := s.db.C("flamenco_tasks")
+
+	// Task 1 has been completed by worker_win
+	task1 := ConstructTestTaskWithPrio("1aaaaaaaaaaaaaaaaaaaaaaa", "sleeping", 50)
+	task1.Status = "completed"
+	task1.WorkerId = &s.worker_win.Id
+	assert.Nil(c, tasks_coll.Insert(task1))
+
+	// Task 2 is available due to its parent being completed.
+	task2 := ConstructTestTaskWithPrio("2aaaaaaaaaaaaaaaaaaaaaaa", "sleeping", 100)
+	task2.Parents = []bson.ObjectId{task1.Id}
+	task2.Status = "claimed-by-manager"
+	assert.Nil(c, tasks_coll.Insert(task2))
+
+	// Fetch a task from the queue
+	resp_rec, ar := WorkerTestRequest(s.worker_lnx.Id, "TEST", "/whatevah")
+	task := s.sched.fetchTaskFromQueueOrManager(resp_rec, ar, s.db, &s.worker_lnx)
+	assert.Equal(c, http.StatusOK, resp_rec.Code)
+
+	// We should get task 2.
+	assert.NotNil(c, task, "Expected task %s, got nil instead", task2.Id.Hex())
+	if task != nil { // prevent nil pointer dereference
+		assert.Equal(c, task.Id, task2.Id, "Expected task %s, got task %s instead",
+			task2.Id.Hex(), task.Id.Hex())
+	}
+}
+
+func (s *SchedulerTestSuite) TestParentTaskOneCompletedOneNot(c *check.C) {
+	tasks_coll := s.db.C("flamenco_tasks")
+
+	// Task 1 is being worked on by worker_win
+	task1 := ConstructTestTaskWithPrio("1aaaaaaaaaaaaaaaaaaaaaaa", "sleeping", 50)
+	task1.Status = "active"
+	task1.WorkerId = &s.worker_win.Id
+	assert.Nil(c, tasks_coll.Insert(task1))
+
+	// Task 2 is already completed.
+	task2 := ConstructTestTaskWithPrio("2aaaaaaaaaaaaaaaaaaaaaaa", "sleeping", 50)
+	task2.Status = "completed"
+	task2.WorkerId = &s.worker_win.Id
+	assert.Nil(c, tasks_coll.Insert(task2))
+
+	// Task 3 is unavailable due to one of its parent not being completed.
+	task3 := ConstructTestTaskWithPrio("3aaaaaaaaaaaaaaaaaaaaaaa", "sleeping", 100)
+	task3.Parents = []bson.ObjectId{task1.Id, task2.Id}
+	task3.Status = "claimed-by-manager"
+	assert.Nil(c, tasks_coll.Insert(task3))
+
+	// Fetch a task from the queue
+	resp_rec, ar := WorkerTestRequest(s.worker_lnx.Id, "TEST", "/whatevah")
+	task := s.sched.fetchTaskFromQueueOrManager(resp_rec, ar, s.db, &s.worker_lnx)
+
+	// We should not get any task back.
+	assert.Nil(c, task, "Expected nil, got task %v instead", task)
+	assert.Equal(c, http.StatusNoContent, resp_rec.Code)
+}
diff --git a/packages/flamenco-manager-go/src/flamenco-manager/flamenco/task_updates.go b/packages/flamenco-manager-go/src/flamenco-manager/flamenco/task_updates.go
index a9d88047..b16d4ded 100644
--- a/packages/flamenco-manager-go/src/flamenco-manager/flamenco/task_updates.go
+++ b/packages/flamenco-manager-go/src/flamenco-manager/flamenco/task_updates.go
@@ -91,6 +91,13 @@ func QueueTaskUpdateFromWorker(w http.ResponseWriter, r *auth.AuthenticatedReque
 }
 
 func QueueTaskUpdate(tupdate *TaskUpdate, db *mgo.Database) error {
+	return QueueTaskUpdateWithExtra(tupdate, db, bson.M{})
+}
+
+/* Same as QueueTaskUpdate(), but with extra updates to be performed on the local flamenco_tasks
+ * collection.
+ */
+func QueueTaskUpdateWithExtra(tupdate *TaskUpdate, db *mgo.Database, extra_updates bson.M) error {
 	// For ensuring the ordering of updates. time.Time has nanosecond precision.
 	tupdate.ReceivedOnManager = time.Now().UTC()
 	tupdate.Id = bson.NewObjectId()
@@ -105,7 +112,7 @@ func QueueTaskUpdate(tupdate *TaskUpdate, db *mgo.Database) error {
 	// This prevents a task being reported active on the worker from overwriting the
 	// cancel-requested state we received from the Server.
 	task_coll := db.C("flamenco_tasks")
-	updates := bson.M{}
+	updates := extra_updates
 	if tupdate.TaskStatus != "" {
 		// Before blindly applying the task status, first check if the transition is valid.
 		if TaskStatusTransitionValid(task_coll, tupdate.TaskId, tupdate.TaskStatus) {
-- 
GitLab