Skip to content
Snippets Groups Projects
Commit f0dd69a4 authored by Sybren A. Stüvel's avatar Sybren A. Stüvel
Browse files

Manager: scheduler now fetches tasks from /api/f/m/{mngr-id}/depsgraph

Still rough & untested.
parent 5e1cf163
Branches
Tags
No related merge requests found
......@@ -81,3 +81,20 @@ func Count(coll *mgo.Collection) (int, error) {
return result.Count, nil
}
func GetSettings(db *mgo.Database) *SettingsInMongo {
settings := &SettingsInMongo{}
err := db.C("settings").Find(bson.M{}).One(settings)
if err != nil && err != mgo.ErrNotFound {
log.Errorf("db.GetSettings: Unable to get settings: ", err)
}
return settings
}
func SaveSettings(db *mgo.Database, settings *SettingsInMongo) {
_, err := db.C("settings").Upsert(bson.M{}, settings)
if err != nil && err != mgo.ErrNotFound {
log.Errorf("db.SaveSettings: Unable to save settings: ", err)
}
}
......@@ -33,6 +33,11 @@ type Task struct {
LastWorkerPing *time.Time `bson:"last_worker_ping,omitempty" json:"-"`
}
// Dependency graph response from Server.
type ScheduledTasks struct {
Depsgraph []Task `json:"depsgraph"`
}
// Both sent from Worker to Manager, as well as from Manager to Server.
type TaskUpdate struct {
Id bson.ObjectId `bson:"_id" json:"_id"`
......@@ -92,3 +97,9 @@ type MayKeepRunningResponse struct {
MayKeepRunning bool `json:"may_keep_running"`
Reason string `json:"reason,omitempty"`
}
// Settings we want to be able to update from within Flamenco Manager itself,
// so those are stored in MongoDB itself.
type SettingsInMongo struct {
DepsgraphLastModified *time.Time
}
......@@ -12,6 +12,10 @@ import (
log "github.com/Sirupsen/logrus"
)
// For timestamp parsing
const IsoFormat = "2006-01-02T15:04:05-0700"
const LastModifiedHeaderFormat = "2006-01-02 15:04:05-07:00"
/**
* Decodes JSON and writes a Bad Request status if it fails.
*/
......
package flamenco
import (
"time"
"github.com/stretchr/testify/assert"
check "gopkg.in/check.v1"
)
type HttpTestSuite struct{}
var _ = check.Suite(&HttpTestSuite{})
func (s *HttpTestSuite) TestParseDates(c *check.C) {
parsed_iso, err1 := time.Parse(IsoFormat, "2017-01-23T13:04:05+0200")
parsed_ms, err2 := time.Parse(LastModifiedHeaderFormat, "2017-01-23 13:04:05+02:00")
assert.Nil(c, err1)
assert.Nil(c, err2)
assert.Equal(c, parsed_iso, parsed_ms)
}
......@@ -17,8 +17,6 @@ import (
* tasks left for workers. */
var last_upstream_check time.Time
const IsoFormat = "2006-01-02T15:04:05-0700"
type TaskScheduler struct {
config *Conf
upstream *UpstreamConnection
......@@ -128,7 +126,7 @@ func (ts *TaskScheduler) fetchTaskFromQueueOrManager(
dtrt := ts.config.DownloadTaskRecheckThrottle
for attempt := 0; attempt < 2; attempt++ {
// TODO: possibly sort on something else.
// TODO: take depsgraph (i.e. parent task status) and task status into account.
info, err := tasks_coll.Find(query).Sort("-priority").Limit(1).Apply(change, &task)
if err == mgo.ErrNotFound {
if attempt == 0 && dtrt >= 0 && time.Now().Sub(last_upstream_check) > dtrt {
......
......@@ -70,6 +70,7 @@ func (self *UpstreamConnection) Close() {
func (self *UpstreamConnection) KickDownloader(synchronous bool) {
if synchronous {
pingback := make(chan bool)
defer close(pingback)
self.download_kick <- pingback
log.Info("KickDownloader: Waiting for task downloader to finish.")
......@@ -137,12 +138,9 @@ func (self *UpstreamConnection) download_task_loop() {
* Downloads a chunkn of tasks from the upstream Flamenco Server.
*/
func download_tasks_from_upstream(config *Conf, mongo_sess *mgo.Session) {
// Try to get as many tasks as we have workers.
db := mongo_sess.DB("")
worker_count := WorkerCount(db)
url_str := fmt.Sprintf("/flamenco/scheduler/tasks/%s?chunk_size=%d",
config.ManagerId, MaxInt(worker_count, 1))
url_str := fmt.Sprintf("/api/flamenco/managers/%s/depsgraph", config.ManagerId)
rel_url, err := url.Parse(url_str)
if err != nil {
log.Warningf("Error parsing '%s' as URL; unable to fetch tasks.", url_str)
......@@ -150,8 +148,6 @@ func download_tasks_from_upstream(config *Conf, mongo_sess *mgo.Session) {
}
get_url := config.Flamenco.ResolveReference(rel_url)
log.Infof("Getting tasks from upstream Flamenco %s", get_url)
req, err := http.NewRequest("GET", get_url.String(), nil)
if err != nil {
log.Warningf("Unable to create GET request: %s", err)
......@@ -159,31 +155,43 @@ func download_tasks_from_upstream(config *Conf, mongo_sess *mgo.Session) {
}
req.SetBasicAuth(config.ManagerSecret, "")
// Set If-Modified-Since header on our request.
settings := GetSettings(db)
if settings.DepsgraphLastModified != nil {
log.Infof("Getting tasks from upstream Flamenco %s If-Modified-Since %s", get_url,
settings.DepsgraphLastModified)
req.Header.Set("If-Modified-Since",
settings.DepsgraphLastModified.Format(LastModifiedHeaderFormat))
} else {
log.Infof("Getting tasks from upstream Flamenco %s", get_url)
}
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
log.Warningf("Unable to GET %s: %s", get_url, err)
return
}
if resp.StatusCode == http.StatusNotModified {
log.Debug("Server-side depsgraph was not modified, nothing to do.")
return
}
if resp.StatusCode == 204 {
log.Info("No tasks for us; sleeping.")
return
}
if resp.StatusCode >= 300 {
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Warningf("Error %d GETing %s: %s", resp.StatusCode, get_url, err)
log.Errorf("Error %d GETing %s: %s", resp.StatusCode, get_url, err)
return
}
log.Warningf("Error %d GETing %s: %s", resp.StatusCode, get_url, body)
return
}
if resp.StatusCode == 204 {
log.Info("No tasks for us; sleeping.")
log.Errorf("Error %d GETing %s: %s", resp.StatusCode, get_url, body)
return
}
// Parse the received tasks.
var scheduled_tasks []Task
var scheduled_tasks ScheduledTasks
decoder := json.NewDecoder(resp.Body)
defer resp.Body.Close()
......@@ -193,14 +201,15 @@ func download_tasks_from_upstream(config *Conf, mongo_sess *mgo.Session) {
}
// Insert them into the MongoDB
// TODO Sybren: before inserting, compare to the database and deal with any changed statuses.
if len(scheduled_tasks) > 0 {
log.Infof("Received %d tasks from upstream Flamenco Server.", len(scheduled_tasks))
depsgraph := scheduled_tasks.Depsgraph
if len(depsgraph) > 0 {
log.Infof("Received %d tasks from upstream Flamenco Server.", len(depsgraph))
} else {
log.Debugf("Received %d tasks from upstream Flamenco Server.", len(scheduled_tasks))
// This shouldn't happen, as it should actually have been a 204 or 306.
log.Debugf("Received %d tasks from upstream Flamenco Server.", len(depsgraph))
}
tasks_coll := db.C("flamenco_tasks")
for _, task := range scheduled_tasks {
for _, task := range depsgraph {
change, err := tasks_coll.Upsert(bson.M{"_id": task.Id}, task)
if err != nil {
log.Errorf("unable to insert new task %s: %s", task.Id.Hex(), err)
......@@ -208,12 +217,24 @@ func download_tasks_from_upstream(config *Conf, mongo_sess *mgo.Session) {
}
if change.Updated > 0 {
log.Infof("Upstream server re-queued existing task %s", task.Id.Hex())
log.Debug("Upstream server re-queued existing task ", task.Id.Hex())
} else if change.Matched > 0 {
log.Infof("Upstream server re-queued existing task %s, but nothing changed",
log.Debugf("Upstream server re-queued existing task %s, but nothing changed",
task.Id.Hex())
}
}
// Check if we had a Last-Modified header, since we need to remember that.
last_modified := resp.Header.Get("Last-Modified")
if last_modified != "" {
log.Info("Last modified task was at ", last_modified)
if parsed, err := time.Parse(LastModifiedHeaderFormat, last_modified); err != nil {
log.Errorf("Unable to parse Last-Modified header: ", err)
} else {
settings.DepsgraphLastModified = &parsed
SaveSettings(db, settings)
}
}
}
func (self *UpstreamConnection) ResolveUrl(relative_url string, a ...interface{}) (*url.URL, error) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment