diff --git a/packages/flamenco-manager-go/src/flamenco-manager/flamenco/db.go b/packages/flamenco-manager-go/src/flamenco-manager/flamenco/db.go index 819a0376439c45883401ee2485ce7fcdbdda5d04..24fffb911e736a72d811119fb37ee0ebfbb34747 100644 --- a/packages/flamenco-manager-go/src/flamenco-manager/flamenco/db.go +++ b/packages/flamenco-manager-go/src/flamenco-manager/flamenco/db.go @@ -81,3 +81,20 @@ func Count(coll *mgo.Collection) (int, error) { return result.Count, nil } + +func GetSettings(db *mgo.Database) *SettingsInMongo { + settings := &SettingsInMongo{} + err := db.C("settings").Find(bson.M{}).One(settings) + if err != nil && err != mgo.ErrNotFound { + log.Errorf("db.GetSettings: Unable to get settings: ", err) + } + + return settings +} + +func SaveSettings(db *mgo.Database, settings *SettingsInMongo) { + _, err := db.C("settings").Upsert(bson.M{}, settings) + if err != nil && err != mgo.ErrNotFound { + log.Errorf("db.SaveSettings: Unable to save settings: ", err) + } +} diff --git a/packages/flamenco-manager-go/src/flamenco-manager/flamenco/documents.go b/packages/flamenco-manager-go/src/flamenco-manager/flamenco/documents.go index d2d880a434c4b7627f60a7b4f04f1feac4d6f1bc..fd68b99000eb1331dc99440a11c4e85b668350b8 100644 --- a/packages/flamenco-manager-go/src/flamenco-manager/flamenco/documents.go +++ b/packages/flamenco-manager-go/src/flamenco-manager/flamenco/documents.go @@ -33,6 +33,11 @@ type Task struct { LastWorkerPing *time.Time `bson:"last_worker_ping,omitempty" json:"-"` } +// Dependency graph response from Server. +type ScheduledTasks struct { + Depsgraph []Task `json:"depsgraph"` +} + // Both sent from Worker to Manager, as well as from Manager to Server. type TaskUpdate struct { Id bson.ObjectId `bson:"_id" json:"_id"` @@ -92,3 +97,9 @@ type MayKeepRunningResponse struct { MayKeepRunning bool `json:"may_keep_running"` Reason string `json:"reason,omitempty"` } + +// Settings we want to be able to update from within Flamenco Manager itself, +// so those are stored in MongoDB itself. +type SettingsInMongo struct { + DepsgraphLastModified *time.Time +} diff --git a/packages/flamenco-manager-go/src/flamenco-manager/flamenco/http.go b/packages/flamenco-manager-go/src/flamenco-manager/flamenco/http.go index 13a5866b6ab93b6403b3617c6fd43c49b9812da1..0027177d95715db9002431103f5722ef3344740d 100644 --- a/packages/flamenco-manager-go/src/flamenco-manager/flamenco/http.go +++ b/packages/flamenco-manager-go/src/flamenco-manager/flamenco/http.go @@ -12,6 +12,10 @@ import ( log "github.com/Sirupsen/logrus" ) +// For timestamp parsing +const IsoFormat = "2006-01-02T15:04:05-0700" +const LastModifiedHeaderFormat = "2006-01-02 15:04:05-07:00" + /** * Decodes JSON and writes a Bad Request status if it fails. */ diff --git a/packages/flamenco-manager-go/src/flamenco-manager/flamenco/http_test.go b/packages/flamenco-manager-go/src/flamenco-manager/flamenco/http_test.go new file mode 100644 index 0000000000000000000000000000000000000000..b16bb18861308012ff71673353789611c1d3c1f6 --- /dev/null +++ b/packages/flamenco-manager-go/src/flamenco-manager/flamenco/http_test.go @@ -0,0 +1,21 @@ +package flamenco + +import ( + "time" + + "github.com/stretchr/testify/assert" + + check "gopkg.in/check.v1" +) + +type HttpTestSuite struct{} + +var _ = check.Suite(&HttpTestSuite{}) + +func (s *HttpTestSuite) TestParseDates(c *check.C) { + parsed_iso, err1 := time.Parse(IsoFormat, "2017-01-23T13:04:05+0200") + parsed_ms, err2 := time.Parse(LastModifiedHeaderFormat, "2017-01-23 13:04:05+02:00") + assert.Nil(c, err1) + assert.Nil(c, err2) + assert.Equal(c, parsed_iso, parsed_ms) +} diff --git a/packages/flamenco-manager-go/src/flamenco-manager/flamenco/scheduler.go b/packages/flamenco-manager-go/src/flamenco-manager/flamenco/scheduler.go index 04fca31248e3cef5f627a11c8e9b1c8b9b4a7700..be0cbb65bf20aeedcb7e1d7c0852c1624bc30d14 100644 --- a/packages/flamenco-manager-go/src/flamenco-manager/flamenco/scheduler.go +++ b/packages/flamenco-manager-go/src/flamenco-manager/flamenco/scheduler.go @@ -17,8 +17,6 @@ import ( * tasks left for workers. */ var last_upstream_check time.Time -const IsoFormat = "2006-01-02T15:04:05-0700" - type TaskScheduler struct { config *Conf upstream *UpstreamConnection @@ -128,7 +126,7 @@ func (ts *TaskScheduler) fetchTaskFromQueueOrManager( dtrt := ts.config.DownloadTaskRecheckThrottle for attempt := 0; attempt < 2; attempt++ { - // TODO: possibly sort on something else. + // TODO: take depsgraph (i.e. parent task status) and task status into account. info, err := tasks_coll.Find(query).Sort("-priority").Limit(1).Apply(change, &task) if err == mgo.ErrNotFound { if attempt == 0 && dtrt >= 0 && time.Now().Sub(last_upstream_check) > dtrt { diff --git a/packages/flamenco-manager-go/src/flamenco-manager/flamenco/upstream.go b/packages/flamenco-manager-go/src/flamenco-manager/flamenco/upstream.go index a2f4267f7f8eeb11c1286152e424a29cb19c150b..0fc99ba36a977e627b10ea42dc4baeeaff3e79b4 100644 --- a/packages/flamenco-manager-go/src/flamenco-manager/flamenco/upstream.go +++ b/packages/flamenco-manager-go/src/flamenco-manager/flamenco/upstream.go @@ -70,6 +70,7 @@ func (self *UpstreamConnection) Close() { func (self *UpstreamConnection) KickDownloader(synchronous bool) { if synchronous { pingback := make(chan bool) + defer close(pingback) self.download_kick <- pingback log.Info("KickDownloader: Waiting for task downloader to finish.") @@ -137,12 +138,9 @@ func (self *UpstreamConnection) download_task_loop() { * Downloads a chunkn of tasks from the upstream Flamenco Server. */ func download_tasks_from_upstream(config *Conf, mongo_sess *mgo.Session) { - // Try to get as many tasks as we have workers. db := mongo_sess.DB("") - worker_count := WorkerCount(db) - url_str := fmt.Sprintf("/flamenco/scheduler/tasks/%s?chunk_size=%d", - config.ManagerId, MaxInt(worker_count, 1)) + url_str := fmt.Sprintf("/api/flamenco/managers/%s/depsgraph", config.ManagerId) rel_url, err := url.Parse(url_str) if err != nil { log.Warningf("Error parsing '%s' as URL; unable to fetch tasks.", url_str) @@ -150,8 +148,6 @@ func download_tasks_from_upstream(config *Conf, mongo_sess *mgo.Session) { } get_url := config.Flamenco.ResolveReference(rel_url) - log.Infof("Getting tasks from upstream Flamenco %s", get_url) - req, err := http.NewRequest("GET", get_url.String(), nil) if err != nil { log.Warningf("Unable to create GET request: %s", err) @@ -159,31 +155,43 @@ func download_tasks_from_upstream(config *Conf, mongo_sess *mgo.Session) { } req.SetBasicAuth(config.ManagerSecret, "") + // Set If-Modified-Since header on our request. + settings := GetSettings(db) + if settings.DepsgraphLastModified != nil { + log.Infof("Getting tasks from upstream Flamenco %s If-Modified-Since %s", get_url, + settings.DepsgraphLastModified) + req.Header.Set("If-Modified-Since", + settings.DepsgraphLastModified.Format(LastModifiedHeaderFormat)) + } else { + log.Infof("Getting tasks from upstream Flamenco %s", get_url) + } + client := &http.Client{} resp, err := client.Do(req) if err != nil { log.Warningf("Unable to GET %s: %s", get_url, err) return } - + if resp.StatusCode == http.StatusNotModified { + log.Debug("Server-side depsgraph was not modified, nothing to do.") + return + } + if resp.StatusCode == 204 { + log.Info("No tasks for us; sleeping.") + return + } if resp.StatusCode >= 300 { body, err := ioutil.ReadAll(resp.Body) if err != nil { - log.Warningf("Error %d GETing %s: %s", resp.StatusCode, get_url, err) + log.Errorf("Error %d GETing %s: %s", resp.StatusCode, get_url, err) return } - - log.Warningf("Error %d GETing %s: %s", resp.StatusCode, get_url, body) - return - } - - if resp.StatusCode == 204 { - log.Info("No tasks for us; sleeping.") + log.Errorf("Error %d GETing %s: %s", resp.StatusCode, get_url, body) return } // Parse the received tasks. - var scheduled_tasks []Task + var scheduled_tasks ScheduledTasks decoder := json.NewDecoder(resp.Body) defer resp.Body.Close() @@ -193,14 +201,15 @@ func download_tasks_from_upstream(config *Conf, mongo_sess *mgo.Session) { } // Insert them into the MongoDB - // TODO Sybren: before inserting, compare to the database and deal with any changed statuses. - if len(scheduled_tasks) > 0 { - log.Infof("Received %d tasks from upstream Flamenco Server.", len(scheduled_tasks)) + depsgraph := scheduled_tasks.Depsgraph + if len(depsgraph) > 0 { + log.Infof("Received %d tasks from upstream Flamenco Server.", len(depsgraph)) } else { - log.Debugf("Received %d tasks from upstream Flamenco Server.", len(scheduled_tasks)) + // This shouldn't happen, as it should actually have been a 204 or 306. + log.Debugf("Received %d tasks from upstream Flamenco Server.", len(depsgraph)) } tasks_coll := db.C("flamenco_tasks") - for _, task := range scheduled_tasks { + for _, task := range depsgraph { change, err := tasks_coll.Upsert(bson.M{"_id": task.Id}, task) if err != nil { log.Errorf("unable to insert new task %s: %s", task.Id.Hex(), err) @@ -208,12 +217,24 @@ func download_tasks_from_upstream(config *Conf, mongo_sess *mgo.Session) { } if change.Updated > 0 { - log.Infof("Upstream server re-queued existing task %s", task.Id.Hex()) + log.Debug("Upstream server re-queued existing task ", task.Id.Hex()) } else if change.Matched > 0 { - log.Infof("Upstream server re-queued existing task %s, but nothing changed", + log.Debugf("Upstream server re-queued existing task %s, but nothing changed", task.Id.Hex()) } } + + // Check if we had a Last-Modified header, since we need to remember that. + last_modified := resp.Header.Get("Last-Modified") + if last_modified != "" { + log.Info("Last modified task was at ", last_modified) + if parsed, err := time.Parse(LastModifiedHeaderFormat, last_modified); err != nil { + log.Errorf("Unable to parse Last-Modified header: ", err) + } else { + settings.DepsgraphLastModified = &parsed + SaveSettings(db, settings) + } + } } func (self *UpstreamConnection) ResolveUrl(relative_url string, a ...interface{}) (*url.URL, error) {