From 617a08fa1483f7d2be30ffaa673daf69f13e735b Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Sybren=20A=2E=20St=C3=BCvel?= <sybren@stuvel.eu>
Date: Thu, 26 Jan 2017 15:27:05 +0100
Subject: [PATCH] Manager: scheduler no longer synchronously kicks the task
 downloader

If there is no task available for a worker, the worker now gets a quick
response that there is no task available. Simultaneously, the downloader
is kicked to fetch a new set of tasks from upstream Server.

Workers ask quite frequently for new tasks (every 5 seconds), so it is
likely that the downloader is already finished when a new request from this
worker comes in.
---
 .../flamenco-manager/flamenco/scheduler.go    | 168 +++++++++---------
 1 file changed, 83 insertions(+), 85 deletions(-)

diff --git a/packages/flamenco-manager-go/src/flamenco-manager/flamenco/scheduler.go b/packages/flamenco-manager-go/src/flamenco-manager/flamenco/scheduler.go
index 8282fffc..97e78717 100644
--- a/packages/flamenco-manager-go/src/flamenco-manager/flamenco/scheduler.go
+++ b/packages/flamenco-manager-go/src/flamenco-manager/flamenco/scheduler.go
@@ -13,14 +13,14 @@ import (
 	"gopkg.in/mgo.v2/bson"
 )
 
-/* Timestamp of the last time we kicked the task downloader because there weren't any
- * tasks left for workers. */
-var last_upstream_check time.Time
-
 type TaskScheduler struct {
 	config   *Conf
 	upstream *UpstreamConnection
 	session  *mgo.Session
+
+	/* Timestamp of the last time we kicked the task downloader because there weren't any
+	 * tasks left for workers. */
+	lastUpstreamCheck time.Time
 }
 
 func CreateTaskScheduler(config *Conf, upstream *UpstreamConnection, session *mgo.Session) *TaskScheduler {
@@ -28,6 +28,7 @@ func CreateTaskScheduler(config *Conf, upstream *UpstreamConnection, session *mg
 		config,
 		upstream,
 		session,
+		time.Time{},
 	}
 }
 
@@ -117,88 +118,85 @@ func (ts *TaskScheduler) fetchTaskFromQueueOrManager(
 	result := AggregationPipelineResult{}
 	tasks_coll := db.C("flamenco_tasks")
 
-	var err error
-	for attempt := 0; attempt < 2; attempt++ {
-		pipe := tasks_coll.Pipe([]M{
-			// 1: Select only tasks that have a runnable status & acceptable job type.
-			M{"$match": M{
-				"status": M{"$in": []string{"queued", "claimed-by-manager"}},
-				// "job_type": M{"$in": []string{"sleeping", "testing"}},
-			}},
-			// 2: Unwind the parents array, so that we can do a lookup in the next stage.
-			M{"$unwind": M{
-				"path": "$parents",
-				"preserveNullAndEmptyArrays": true,
-			}},
-			// 3: Look up the parent document for each unwound task.
-			// This produces 1-length "parent_doc" arrays.
-			M{"$lookup": M{
-				"from":         "flamenco_tasks",
-				"localField":   "parents",
-				"foreignField": "_id",
-				"as":           "parent_doc",
-			}},
-			// 4: Unwind again, to turn the 1-length "parent_doc" arrays into a subdocument.
-			M{"$unwind": M{
-				"path": "$parent_doc",
-				"preserveNullAndEmptyArrays": true,
-			}},
-			// 5: Group by task ID to undo the unwind, and create an array parent_statuses
-			// with booleans indicating whether the parent status is "completed".
-			M{"$group": M{
-				"_id": "$_id",
-				"parent_statuses": M{"$push": M{
-					"$eq": []interface{}{
-						"completed",
-						M{"$ifNull": []string{"$parent_doc.status", "completed"}}}}},
-				// This allows us to keep all dynamic properties of the original task document:
-				"task": M{"$first": "$$ROOT"},
-			}},
-			// 6: Turn the list of "parent_statuses" booleans into a single boolean
-			M{"$project": M{
-				"_id":               0,
-				"parents_completed": M{"$allElementsTrue": []string{"$parent_statuses"}},
-				"task":              1,
-			}},
-			// 7: Select only those tasks for which the parents have completed.
-			M{"$match": M{
-				"parents_completed": true,
-			}},
-			// 8: just keep the task info, the "parents_runnable" is no longer needed.
-			M{"$project": M{"task": 1}},
-			// 9: Sort by priority, with highest prio first. If prio is equal, use newest task.
-			M{"$sort": bson.D{
-				{"task.priority", -1},
-				{"task._id", 1},
-			}},
-			// 10: Only return one task.
-			M{"$limit": 1},
-		})
-
-		err = pipe.One(&result)
-		if err == mgo.ErrNotFound {
-			log.Infof("No tasks for worker %s found on attempt %d.", worker.Identifier(), attempt)
-			dtrt := ts.config.DownloadTaskRecheckThrottle
-			if attempt == 0 && dtrt >= 0 && time.Now().Sub(last_upstream_check) > dtrt {
-				// On first attempt: try fetching new tasks from upstream, then re-query the DB.
-				log.Infof("%s No more tasks available for %s, checking upstream",
-					r.RemoteAddr, r.Username)
-				last_upstream_check = time.Now()
-				ts.upstream.KickDownloader(true)
-				continue
-			}
-
-			log.Infof("%s Really no more tasks available for %s", r.RemoteAddr, r.Username)
-			w.WriteHeader(204)
-			return nil
-		} else if err != nil {
-			log.Errorf("%s Error fetching task for %s: %s", r.RemoteAddr, r.Username, err)
-			w.WriteHeader(500)
-			return nil
-		}
-
-		break
+	pipe := tasks_coll.Pipe([]M{
+		// 1: Select only tasks that have a runnable status & acceptable job type.
+		M{"$match": M{
+			"status": M{"$in": []string{"queued", "claimed-by-manager"}},
+			// "job_type": M{"$in": []string{"sleeping", "testing"}},
+		}},
+		// 2: Unwind the parents array, so that we can do a lookup in the next stage.
+		M{"$unwind": M{
+			"path": "$parents",
+			"preserveNullAndEmptyArrays": true,
+		}},
+		// 3: Look up the parent document for each unwound task.
+		// This produces 1-length "parent_doc" arrays.
+		M{"$lookup": M{
+			"from":         "flamenco_tasks",
+			"localField":   "parents",
+			"foreignField": "_id",
+			"as":           "parent_doc",
+		}},
+		// 4: Unwind again, to turn the 1-length "parent_doc" arrays into a subdocument.
+		M{"$unwind": M{
+			"path": "$parent_doc",
+			"preserveNullAndEmptyArrays": true,
+		}},
+		// 5: Group by task ID to undo the unwind, and create an array parent_statuses
+		// with booleans indicating whether the parent status is "completed".
+		M{"$group": M{
+			"_id": "$_id",
+			"parent_statuses": M{"$push": M{
+				"$eq": []interface{}{
+					"completed",
+					M{"$ifNull": []string{"$parent_doc.status", "completed"}}}}},
+			// This allows us to keep all dynamic properties of the original task document:
+			"task": M{"$first": "$$ROOT"},
+		}},
+		// 6: Turn the list of "parent_statuses" booleans into a single boolean
+		M{"$project": M{
+			"_id":               0,
+			"parents_completed": M{"$allElementsTrue": []string{"$parent_statuses"}},
+			"task":              1,
+		}},
+		// 7: Select only those tasks for which the parents have completed.
+		M{"$match": M{
+			"parents_completed": true,
+		}},
+		// 8: just keep the task info, the "parents_runnable" is no longer needed.
+		M{"$project": M{"task": 1}},
+		// 9: Sort by priority, with highest prio first. If prio is equal, use newest task.
+		M{"$sort": bson.D{
+			{"task.priority", -1},
+			{"task._id", 1},
+		}},
+		// 10: Only return one task.
+		M{"$limit": 1},
+	})
+
+	err := pipe.One(&result)
+	if err == mgo.ErrNotFound {
+		log.Infof("%s no more tasks available for %s", r.RemoteAddr, worker.Identifier())
+		ts.maybeKickTaskDownloader()
+		w.WriteHeader(204)
+		return nil
+	}
+	if err != nil {
+		log.Errorf("%s Error fetching task for %s: %s", r.RemoteAddr, r.Username, err)
+		w.WriteHeader(500)
+		return nil
 	}
 
 	return result.Task
 }
+
+func (ts *TaskScheduler) maybeKickTaskDownloader() {
+	dtrt := ts.config.DownloadTaskRecheckThrottle
+	if dtrt < 0 || time.Now().Sub(ts.lastUpstreamCheck) <= dtrt {
+		return
+	}
+
+	log.Infof("TaskScheduler: kicking task downloader")
+	ts.lastUpstreamCheck = time.Now()
+	ts.upstream.KickDownloader(false)
+}
-- 
GitLab