diff --git a/packages/flamenco-manager-go/README.md b/packages/flamenco-manager-go/README.md index 4111fa51d433a6e1ae3a2c648d3face5beeb76c9..571d0b8fdf30ff5a337da1215438299b2a705a86 100644 --- a/packages/flamenco-manager-go/README.md +++ b/packages/flamenco-manager-go/README.md @@ -29,7 +29,7 @@ Flamenco Manager accepts the following CLI arguments: of this `flamenco-manager-go` directory. 0. Make sure you have MongoDB up and running (on localhost) -1. Install Go 1.7 or newer +1. Install Go 1.8 or newer 2. `export GOPATH=$FM` 3. `cd $FM/src/flamenco-manager` 4. Download all dependencies with `go get` diff --git a/packages/flamenco-manager-go/src/flamenco-manager/flamenco/closable.go b/packages/flamenco-manager-go/src/flamenco-manager/flamenco/closable.go new file mode 100644 index 0000000000000000000000000000000000000000..d9be5d4e8969773af6b417b930b4d89616902dd8 --- /dev/null +++ b/packages/flamenco-manager-go/src/flamenco-manager/flamenco/closable.go @@ -0,0 +1,39 @@ +package flamenco + +import ( + "sync" + + log "github.com/Sirupsen/logrus" +) + +// closable offers a way to cleanly shut down a running goroutine. +type closable struct { + doneChan chan struct{} + doneWg *sync.WaitGroup +} + +// makeClosable constructs a new closable struct +func makeClosable() closable { + return closable{make(chan struct{}), new(sync.WaitGroup)} +} + +// closableAdd(delta) should be combined with 'delta' calls to closableDone() +func (closable *closable) closableAdd(delta int) bool { + log.Debugf("Closable: doneWg.Add(%d) ok", delta) + closable.doneWg.Add(delta) + return true +} + +// closableDone marks one "thing" as "done" +func (closable *closable) closableDone() { + log.Debugf("Closable: doneWg.Done() ok") + closable.doneWg.Done() +} + +// closableCloseAndWait marks the goroutine as "done", +// and waits for all things added with closableAdd() to be "done" too. +func (closable *closable) closableCloseAndWait() { + close(closable.doneChan) + log.Debugf("Closable: waiting for shutdown to finish.") + closable.doneWg.Wait() +} diff --git a/packages/flamenco-manager-go/src/flamenco-manager/flamenco/db.go b/packages/flamenco-manager-go/src/flamenco-manager/flamenco/db.go index 7a41c176b44c79828b537b47e500b4c9cd6d3b12..857748b623a9f3987143e0305e4e124fa351fd99 100644 --- a/packages/flamenco-manager-go/src/flamenco-manager/flamenco/db.go +++ b/packages/flamenco-manager-go/src/flamenco-manager/flamenco/db.go @@ -14,14 +14,13 @@ type countresult struct { Count int `bson:"count"` } +// M is a shortcut for bson.M to make longer queries easier to read. type M bson.M -/** - * Returns a MongoDB session. - * - * The database name should be configured in the database URL. - * You can use this default database using session.DB(""). - */ +// MongoSession returns a MongoDB session. +// +// The database name should be configured in the database URL. +// You can use this default database using session.DB(""). func MongoSession(config *Conf) *mgo.Session { var err error var session *mgo.Session @@ -32,12 +31,12 @@ func MongoSession(config *Conf) *mgo.Session { } session.SetMode(mgo.Monotonic, true) - ensure_indices(session) + ensureIndices(session) return session } -func ensure_indices(session *mgo.Session) { +func ensureIndices(session *mgo.Session) { db := session.DB("") index := mgo.Index{ @@ -63,11 +62,9 @@ func ensure_indices(session *mgo.Session) { } } -/** - * Counts the number of documents in the given collection. - */ +// Count returns the number of documents in the given collection. func Count(coll *mgo.Collection) (int, error) { - aggr_ops := []bson.M{ + aggrOps := []bson.M{ bson.M{ "$group": bson.M{ "_id": nil, @@ -75,7 +72,7 @@ func Count(coll *mgo.Collection) (int, error) { }, }, } - pipe := coll.Pipe(aggr_ops) + pipe := coll.Pipe(aggrOps) result := countresult{} if err := pipe.One(&result); err != nil { if err == mgo.ErrNotFound { @@ -88,24 +85,26 @@ func Count(coll *mgo.Collection) (int, error) { return result.Count, nil } +// GetSettings returns the settings as saved in our MongoDB. func GetSettings(db *mgo.Database) *SettingsInMongo { settings := &SettingsInMongo{} err := db.C("settings").Find(bson.M{}).One(settings) if err != nil && err != mgo.ErrNotFound { - log.Errorf("db.GetSettings: Unable to get settings: ", err) + log.Panic("db.GetSettings: Unable to get settings: ", err) } return settings } +// SaveSettings stores the given settings in MongoDB. func SaveSettings(db *mgo.Database, settings *SettingsInMongo) { _, err := db.C("settings").Upsert(bson.M{}, settings) if err != nil && err != mgo.ErrNotFound { - log.Errorf("db.SaveSettings: Unable to save settings: ", err) + log.Panic("db.SaveSettings: Unable to save settings: ", err) } } -/* Erases all tasks in the flamenco_tasks collection. */ +// CleanSlate erases all tasks in the flamenco_tasks collection. func CleanSlate(db *mgo.Database) { fmt.Println("") fmt.Println("Performing Clean Slate operation, this will erase all tasks from the local DB.") diff --git a/packages/flamenco-manager-go/src/flamenco-manager/flamenco/documents.go b/packages/flamenco-manager-go/src/flamenco-manager/flamenco/documents.go index 9b4e244c2bb4ae6b250a9ad4da0e8f0cb1511e28..60e9cf9cf0d90a90fa203aaf741797a700cc24e3 100644 --- a/packages/flamenco-manager-go/src/flamenco-manager/flamenco/documents.go +++ b/packages/flamenco-manager-go/src/flamenco-manager/flamenco/documents.go @@ -6,13 +6,15 @@ import ( "gopkg.in/mgo.v2/bson" ) +// Command is an executable part of a Task type Command struct { Name string `bson:"name" json:"name"` Settings bson.M `bson:"settings" json:"settings"` } +// Task contains a Flamenco task, with some BSON-only fields for local Manager use. type Task struct { - Id bson.ObjectId `bson:"_id,omitempty" json:"_id,omitempty"` + ID bson.ObjectId `bson:"_id,omitempty" json:"_id,omitempty"` Etag string `bson:"_etag,omitempty" json:"_etag,omitempty"` Job bson.ObjectId `bson:"job,omitempty" json:"job"` Manager bson.ObjectId `bson:"manager,omitempty" json:"manager"` @@ -30,23 +32,24 @@ type Task struct { Worker string `bson:"worker,omitempty" json:"worker,omitempty"` // Internal bookkeeping - WorkerId *bson.ObjectId `bson:"worker_id,omitempty" json:"-"` + WorkerID *bson.ObjectId `bson:"worker_id,omitempty" json:"-"` LastWorkerPing *time.Time `bson:"last_worker_ping,omitempty" json:"-"` } -type AggregationPipelineResult struct { +type aggregationPipelineResult struct { +// For internal MongoDB querying only Task *Task `bson:"task"` } -// Dependency graph response from Server. +// ScheduledTasks contains a dependency graph response from Server. type ScheduledTasks struct { Depsgraph []Task `json:"depsgraph"` } -// Both sent from Worker to Manager, as well as from Manager to Server. +// TaskUpdate is both sent from Worker to Manager, as well as from Manager to Server. type TaskUpdate struct { - Id bson.ObjectId `bson:"_id" json:"_id"` - TaskId bson.ObjectId `bson:"task_id" json:"task_id,omitempty"` + ID bson.ObjectId `bson:"_id" json:"_id"` + TaskID bson.ObjectId `bson:"task_id" json:"task_id,omitempty"` TaskStatus string `bson:"task_status,omitempty" json:"task_status,omitempty"` ReceivedOnManager time.Time `bson:"received_on_manager" json:"received_on_manager"` Activity string `bson:"activity,omitempty" json:"activity,omitempty"` @@ -57,13 +60,14 @@ type TaskUpdate struct { Worker string `bson:"worker" json:"worker"` } -// Received from Server. +// TaskUpdateResponse is received from Server. type TaskUpdateResponse struct { ModifiedCount int `json:"modified_count"` HandledUpdateIds []bson.ObjectId `json:"handled_update_ids,omitempty"` CancelTasksIds []bson.ObjectId `json:"cancel_task_ids,omitempty"` } +// WorkerRegistration is sent by the Worker to register itself at this Manager. type WorkerRegistration struct { Secret string `json:"secret"` Platform string `json:"platform"` @@ -71,8 +75,10 @@ type WorkerRegistration struct { Nickname string `json:"nickname"` } +// Worker contains all information about a specific Worker. +// Some fields come from the WorkerRegistration, whereas others are filled by us. type Worker struct { - Id bson.ObjectId `bson:"_id,omitempty" json:"_id,omitempty"` + ID bson.ObjectId `bson:"_id,omitempty" json:"_id,omitempty"` Secret string `bson:"-" json:"-"` HashedSecret []byte `bson:"hashed_secret" json:"-"` Nickname string `bson:"nickname" json:"nickname"` @@ -85,30 +91,30 @@ type Worker struct { SupportedJobTypes []string `bson:"supported_job_types" json:"supported_job_types"` } -/** - * Notification sent to upstream Flamenco Server upon startup. This is a combination - * of settings (see settings.go) and information from the database. - */ +// StartupNotification sent to upstream Flamenco Server upon startup. This is a combination +// of settings (see settings.go) and information from the database. type StartupNotification struct { // Settings - ManagerUrl string `json:"manager_url"` + ManagerURL string `json:"manager_url"` VariablesByVarname map[string]map[string]string `json:"variables"` // From our local database NumberOfWorkers int `json:"nr_of_workers"` } +// MayKeepRunningResponse is sent to workers to indicate whether they can keep running their task. type MayKeepRunningResponse struct { MayKeepRunning bool `json:"may_keep_running"` Reason string `json:"reason,omitempty"` } -// Settings we want to be able to update from within Flamenco Manager itself, -// so those are stored in MongoDB itself. +// SettingsInMongo contains settings we want to be able to update from +// within Flamenco Manager itself, so those are stored in MongoDB. type SettingsInMongo struct { DepsgraphLastModified *string `bson:"depsgraph_last_modified"` } +// StatusReport is sent in response to a query on the / URL. type StatusReport struct { NrOfWorkers int `json:"nr_of_workers"` NrOfTasks int `json:"nr_of_tasks"` diff --git a/packages/flamenco-manager-go/src/flamenco-manager/flamenco/gocheck_test.go b/packages/flamenco-manager-go/src/flamenco-manager/flamenco/gocheck_test.go index 83936c5306b04a22c1cad489d9b18bac934d9213..08cd2b88e04eec8ac7eb0ad348c9f568212bb1a3 100644 --- a/packages/flamenco-manager-go/src/flamenco-manager/flamenco/gocheck_test.go +++ b/packages/flamenco-manager-go/src/flamenco-manager/flamenco/gocheck_test.go @@ -20,7 +20,7 @@ func ConstructTestTask(task_id, job_type string) Task { func ConstructTestTaskWithPrio(task_id, job_type string, priority int) Task { return Task{ - Id: bson.ObjectIdHex(task_id), + ID: bson.ObjectIdHex(task_id), Etag: "1234567", Job: bson.ObjectIdHex("bbbbbbbbbbbbbbbbbbbbbbbb"), Manager: bson.ObjectIdHex("cccccccccccccccccccccccc"), diff --git a/packages/flamenco-manager-go/src/flamenco-manager/flamenco/scheduler.go b/packages/flamenco-manager-go/src/flamenco-manager/flamenco/scheduler.go index 235408715b2f32da58838203c81a764c5364929a..7fce5d8232eb4759e8ae046cd70edea755b61509 100644 --- a/packages/flamenco-manager-go/src/flamenco-manager/flamenco/scheduler.go +++ b/packages/flamenco-manager-go/src/flamenco-manager/flamenco/scheduler.go @@ -64,7 +64,7 @@ func (ts *TaskScheduler) ScheduleTask(w http.ResponseWriter, r *auth.Authenticat break } - log.Debugf("Task %s was changed, reexamining queue.", task.Id.Hex()) + log.Debugf("Task %s was changed, reexamining queue.", task.ID.Hex()) } if was_changed { log.Errorf("Infinite loop detected, tried 1000 tasks and they all changed...") @@ -74,14 +74,14 @@ func (ts *TaskScheduler) ScheduleTask(w http.ResponseWriter, r *auth.Authenticat // Update the task status to "active", pushing it as a task update to the manager too. task.Status = "active" - tupdate := TaskUpdate{TaskId: task.Id, TaskStatus: task.Status} + tupdate := TaskUpdate{TaskID: task.ID, TaskStatus: task.Status} local_updates := bson.M{ - "worker_id": worker.Id, + "worker_id": worker.ID, "last_worker_ping": UtcNow(), } if err := QueueTaskUpdateWithExtra(&tupdate, db, local_updates); err != nil { log.Errorf("Unable to queue task update while assigning task %s to worker %s: %s", - task.Id.Hex(), worker.Identifier(), err) + task.ID.Hex(), worker.Identifier(), err) w.WriteHeader(http.StatusInternalServerError) return } @@ -95,12 +95,12 @@ func (ts *TaskScheduler) ScheduleTask(w http.ResponseWriter, r *auth.Authenticat encoder.Encode(task) log.Infof("ScheduleTask: assigned task %s to worker %s", - task.Id.Hex(), worker.Identifier()) + task.ID.Hex(), worker.Identifier()) // Push a task log line stating we've assigned this task to the given worker. // This is done here, instead of by the worker, so that it's logged even if the worker fails. msg := fmt.Sprintf("Manager assigned task to worker %s", worker.Identifier()) - LogTaskActivity(worker, task.Id, msg, time.Now().Format(IsoFormat)+": "+msg, db) + LogTaskActivity(worker, task.ID, msg, time.Now().Format(IsoFormat)+": "+msg, db) } /** @@ -116,7 +116,7 @@ func (ts *TaskScheduler) fetchTaskFromQueueOrManager( return nil } - result := AggregationPipelineResult{} + result := aggregationPipelineResult{} tasks_coll := db.C("flamenco_tasks") pipe := tasks_coll.Pipe([]M{ diff --git a/packages/flamenco-manager-go/src/flamenco-manager/flamenco/scheduler_test.go b/packages/flamenco-manager-go/src/flamenco-manager/flamenco/scheduler_test.go index cc819296483b280fefd042069fd05b703e6c7a7c..c33c9f84c16e7354a3f38cfc27f9dd22095f0674 100644 --- a/packages/flamenco-manager-go/src/flamenco-manager/flamenco/scheduler_test.go +++ b/packages/flamenco-manager-go/src/flamenco-manager/flamenco/scheduler_test.go @@ -94,7 +94,7 @@ func (s *SchedulerTestSuite) TestVariableReplacement(t *check.C) { // Perform HTTP request resp_rec := httptest.NewRecorder() request, _ := http.NewRequest("GET", "/task", nil) - ar := &auth.AuthenticatedRequest{Request: *request, Username: s.worker_lnx.Id.Hex()} + ar := &auth.AuthenticatedRequest{Request: *request, Username: s.worker_lnx.ID.Hex()} s.sched.ScheduleTask(resp_rec, ar) // Check the response JSON @@ -106,7 +106,7 @@ func (s *SchedulerTestSuite) TestVariableReplacement(t *check.C) { json_task.Commands[0].Settings["message"]) // Check worker with other job type - ar = &auth.AuthenticatedRequest{Request: *request, Username: s.worker_win.Id.Hex()} + ar = &auth.AuthenticatedRequest{Request: *request, Username: s.worker_win.ID.Hex()} s.sched.ScheduleTask(resp_rec, ar) // Check the response JSON @@ -132,13 +132,13 @@ func (s *SchedulerTestSuite) TestSchedulerOrderByPriority(t *check.C) { // Perform HTTP request to the scheduler. resp_rec := httptest.NewRecorder() request, _ := http.NewRequest("GET", "/task", nil) - ar := &auth.AuthenticatedRequest{Request: *request, Username: s.worker_lnx.Id.Hex()} + ar := &auth.AuthenticatedRequest{Request: *request, Username: s.worker_lnx.ID.Hex()} s.sched.ScheduleTask(resp_rec, ar) // We should have gotten task 2, because it has the highest priority. json_task := Task{} parseJson(t, resp_rec, 200, &json_task) - assert.Equal(t, task2.Id.Hex(), json_task.Id.Hex()) + assert.Equal(t, task2.ID.Hex(), json_task.ID.Hex()) } func (s *SchedulerTestSuite) TestSchedulerOrderByJobPriority(t *check.C) { @@ -157,13 +157,13 @@ func (s *SchedulerTestSuite) TestSchedulerOrderByJobPriority(t *check.C) { // Perform HTTP request to the scheduler. resp_rec := httptest.NewRecorder() request, _ := http.NewRequest("GET", "/task", nil) - ar := &auth.AuthenticatedRequest{Request: *request, Username: s.worker_lnx.Id.Hex()} + ar := &auth.AuthenticatedRequest{Request: *request, Username: s.worker_lnx.ID.Hex()} s.sched.ScheduleTask(resp_rec, ar) // We should have gotten task 1, because its job has the highest priority. json_task := Task{} parseJson(t, resp_rec, 200, &json_task) - assert.Equal(t, task1.Id.Hex(), json_task.Id.Hex()) + assert.Equal(t, task1.ID.Hex(), json_task.ID.Hex()) } /** @@ -204,7 +204,7 @@ func (s *SchedulerTestSuite) TestSchedulerVerifyUpstreamCanceled(t *check.C) { // Perform HTTP request to the scheduler. resp_rec := httptest.NewRecorder() request, _ := http.NewRequest("GET", "/task", nil) - ar := &auth.AuthenticatedRequest{Request: *request, Username: s.worker_lnx.Id.Hex()} + ar := &auth.AuthenticatedRequest{Request: *request, Username: s.worker_lnx.ID.Hex()} s.sched.ScheduleTask(resp_rec, ar) timedout := <-timeout @@ -215,11 +215,11 @@ func (s *SchedulerTestSuite) TestSchedulerVerifyUpstreamCanceled(t *check.C) { parseJson(t, resp_rec, 200, &json_task) // We should have gotten task 1, because task 2 was canceled. - assert.Equal(t, task1.Id.Hex(), json_task.Id.Hex()) + assert.Equal(t, task1.ID.Hex(), json_task.ID.Hex()) // In our queue, task 2 should have been canceled, since it was canceled on the server. found_task2 := Task{} - err := s.db.C("flamenco_tasks").FindId(task2.Id).One(&found_task2) + err := s.db.C("flamenco_tasks").FindId(task2.ID).One(&found_task2) assert.Equal(t, nil, err) assert.Equal(t, "canceled", found_task2.Status) } @@ -256,7 +256,7 @@ func (s *SchedulerTestSuite) TestSchedulerVerifyUpstreamPrioChange(t *check.C) { // Perform HTTP request to the scheduler. resp_rec := httptest.NewRecorder() request, _ := http.NewRequest("GET", "/task", nil) - ar := &auth.AuthenticatedRequest{Request: *request, Username: s.worker_lnx.Id.Hex()} + ar := &auth.AuthenticatedRequest{Request: *request, Username: s.worker_lnx.ID.Hex()} s.sched.ScheduleTask(resp_rec, ar) timedout := <-timeout @@ -267,16 +267,16 @@ func (s *SchedulerTestSuite) TestSchedulerVerifyUpstreamPrioChange(t *check.C) { parseJson(t, resp_rec, 200, &json_task) // We should have gotten task 1, because task 2 was lowered in prio. - assert.Equal(t, task1.Id.Hex(), json_task.Id.Hex()) + assert.Equal(t, task1.ID.Hex(), json_task.ID.Hex()) // In our queue, task 2 should have been lowered in prio, and task1 should be active. found_task := Task{} - err := s.db.C("flamenco_tasks").FindId(task2.Id).One(&found_task) + err := s.db.C("flamenco_tasks").FindId(task2.ID).One(&found_task) assert.Equal(t, nil, err) assert.Equal(t, "queued", found_task.Status) assert.Equal(t, 5, found_task.Priority) - err = s.db.C("flamenco_tasks").FindId(task1.Id).One(&found_task) + err = s.db.C("flamenco_tasks").FindId(task1.ID).One(&found_task) assert.Equal(t, nil, err) assert.Equal(t, "active", found_task.Status) assert.Equal(t, 50, found_task.Priority) @@ -310,7 +310,7 @@ func (s *SchedulerTestSuite) TestSchedulerVerifyUpstreamDeleted(t *check.C) { // Perform HTTP request to the scheduler. resp_rec := httptest.NewRecorder() request, _ := http.NewRequest("GET", "/task", nil) - ar := &auth.AuthenticatedRequest{Request: *request, Username: s.worker_lnx.Id.Hex()} + ar := &auth.AuthenticatedRequest{Request: *request, Username: s.worker_lnx.ID.Hex()} s.sched.ScheduleTask(resp_rec, ar) timedout := <-timeout @@ -321,16 +321,16 @@ func (s *SchedulerTestSuite) TestSchedulerVerifyUpstreamDeleted(t *check.C) { parseJson(t, resp_rec, 200, &json_task) // We should have gotten task 1, because task 2 was deleted. - assert.Equal(t, task1.Id.Hex(), json_task.Id.Hex()) + assert.Equal(t, task1.ID.Hex(), json_task.ID.Hex()) // In our queue, task 2 should have been canceled, and task1 should be active. found_task := Task{} - err := s.db.C("flamenco_tasks").FindId(task2.Id).One(&found_task) + err := s.db.C("flamenco_tasks").FindId(task2.ID).One(&found_task) assert.Equal(t, nil, err) assert.Equal(t, "canceled", found_task.Status) assert.Equal(t, 100, found_task.Priority) - err = s.db.C("flamenco_tasks").FindId(task1.Id).One(&found_task) + err = s.db.C("flamenco_tasks").FindId(task1.ID).One(&found_task) assert.Equal(t, nil, err) assert.Equal(t, "active", found_task.Status) assert.Equal(t, 50, found_task.Priority) @@ -342,17 +342,17 @@ func (s *SchedulerTestSuite) TestParentTaskNotCompleted(c *check.C) { // Task 1 is being worked on by worker_win task1 := ConstructTestTaskWithPrio("1aaaaaaaaaaaaaaaaaaaaaaa", "sleeping", 50) task1.Status = "active" - task1.WorkerId = &s.worker_win.Id + task1.WorkerID = &s.worker_win.ID assert.Nil(c, tasks_coll.Insert(task1)) // Task 2 is unavailable due to its parent not being completed. task2 := ConstructTestTaskWithPrio("2aaaaaaaaaaaaaaaaaaaaaaa", "sleeping", 100) - task2.Parents = []bson.ObjectId{task1.Id} + task2.Parents = []bson.ObjectId{task1.ID} task2.Status = "claimed-by-manager" assert.Nil(c, tasks_coll.Insert(task2)) // Fetch a task from the queue - resp_rec, _ := WorkerTestRequest(s.worker_lnx.Id, "TEST", "/whatevah") + resp_rec, _ := WorkerTestRequest(s.worker_lnx.ID, "TEST", "/whatevah") task := s.sched.fetchTaskFromQueueOrManager(resp_rec, s.db, &s.worker_lnx) // We should not get any task back, since task1 is already taken, and task2 @@ -367,25 +367,25 @@ func (s *SchedulerTestSuite) TestParentTaskCompleted(c *check.C) { // Task 1 has been completed by worker_win task1 := ConstructTestTaskWithPrio("1aaaaaaaaaaaaaaaaaaaaaaa", "sleeping", 50) task1.Status = "completed" - task1.WorkerId = &s.worker_win.Id + task1.WorkerID = &s.worker_win.ID assert.Nil(c, tasks_coll.Insert(task1)) // Task 2 is available due to its parent being completed. task2 := ConstructTestTaskWithPrio("2aaaaaaaaaaaaaaaaaaaaaaa", "sleeping", 100) - task2.Parents = []bson.ObjectId{task1.Id} + task2.Parents = []bson.ObjectId{task1.ID} task2.Status = "claimed-by-manager" assert.Nil(c, tasks_coll.Insert(task2)) // Fetch a task from the queue - resp_rec, _ := WorkerTestRequest(s.worker_lnx.Id, "TEST", "/whatevah") + resp_rec, _ := WorkerTestRequest(s.worker_lnx.ID, "TEST", "/whatevah") task := s.sched.fetchTaskFromQueueOrManager(resp_rec, s.db, &s.worker_lnx) assert.Equal(c, http.StatusOK, resp_rec.Code) // We should get task 2. - assert.NotNil(c, task, "Expected task %s, got nil instead", task2.Id.Hex()) + assert.NotNil(c, task, "Expected task %s, got nil instead", task2.ID.Hex()) if task != nil { // prevent nil pointer dereference - assert.Equal(c, task.Id, task2.Id, "Expected task %s, got task %s instead", - task2.Id.Hex(), task.Id.Hex()) + assert.Equal(c, task.ID, task2.ID, "Expected task %s, got task %s instead", + task2.ID.Hex(), task.ID.Hex()) } } @@ -395,23 +395,23 @@ func (s *SchedulerTestSuite) TestParentTaskOneCompletedOneNot(c *check.C) { // Task 1 is being worked on by worker_win task1 := ConstructTestTaskWithPrio("1aaaaaaaaaaaaaaaaaaaaaaa", "sleeping", 50) task1.Status = "active" - task1.WorkerId = &s.worker_win.Id + task1.WorkerID = &s.worker_win.ID assert.Nil(c, tasks_coll.Insert(task1)) // Task 2 is already completed. task2 := ConstructTestTaskWithPrio("2aaaaaaaaaaaaaaaaaaaaaaa", "sleeping", 50) task2.Status = "completed" - task2.WorkerId = &s.worker_win.Id + task2.WorkerID = &s.worker_win.ID assert.Nil(c, tasks_coll.Insert(task2)) // Task 3 is unavailable due to one of its parent not being completed. task3 := ConstructTestTaskWithPrio("3aaaaaaaaaaaaaaaaaaaaaaa", "sleeping", 100) - task3.Parents = []bson.ObjectId{task1.Id, task2.Id} + task3.Parents = []bson.ObjectId{task1.ID, task2.ID} task3.Status = "claimed-by-manager" assert.Nil(c, tasks_coll.Insert(task3)) // Fetch a task from the queue - resp_rec, _ := WorkerTestRequest(s.worker_lnx.Id, "TEST", "/whatevah") + resp_rec, _ := WorkerTestRequest(s.worker_lnx.ID, "TEST", "/whatevah") task := s.sched.fetchTaskFromQueueOrManager(resp_rec, s.db, &s.worker_lnx) // We should not get any task back. diff --git a/packages/flamenco-manager-go/src/flamenco-manager/flamenco/task_timeout_check.go b/packages/flamenco-manager-go/src/flamenco-manager/flamenco/task_timeout_check.go index c03ddccb0e484f9268e7e926a5718a5504f8777d..5b9941c8e51c6b0b15f76edec73cf8be1008638b 100644 --- a/packages/flamenco-manager-go/src/flamenco-manager/flamenco/task_timeout_check.go +++ b/packages/flamenco-manager-go/src/flamenco-manager/flamenco/task_timeout_check.go @@ -5,7 +5,6 @@ package flamenco import ( "fmt" - "sync" "time" log "github.com/Sirupsen/logrus" @@ -17,17 +16,15 @@ const TASK_TIMEOUT_CHECK_INTERVAL = 5 * time.Second const TASK_TIMEOUT_CHECK_INITIAL_SLEEP = 5 * time.Minute type TaskTimeoutChecker struct { - config *Conf - session *mgo.Session - done_chan chan bool - done_wg *sync.WaitGroup + closable + config *Conf + session *mgo.Session } func CreateTaskTimeoutChecker(config *Conf, session *mgo.Session) *TaskTimeoutChecker { return &TaskTimeoutChecker{ + makeClosable(), config, session, - make(chan bool), - &sync.WaitGroup{}, } } @@ -36,21 +33,19 @@ func (self *TaskTimeoutChecker) Go() { defer session.Close() db := session.DB("") - self.done_wg.Add(1) - defer self.done_wg.Done() - defer log.Infof("TaskTimeoutChecker: shutting down.") + self.closableAdd(1) + defer self.closableDone() + defer log.Info("TaskTimeoutChecker: shutting down.") // Start with a delay, so that workers get a chance to push their updates // after the manager has started up. - ok := KillableSleep("TaskTimeoutChecker-initial", TASK_TIMEOUT_CHECK_INITIAL_SLEEP, - self.done_chan, self.done_wg) + ok := KillableSleep("TaskTimeoutChecker-initial", TASK_TIMEOUT_CHECK_INITIAL_SLEEP, &self.closable) if !ok { - log.Warningf("TaskTimeoutChecker: Killable sleep was killed, not even starting checker.") + log.Info("TaskTimeoutChecker: Killable sleep was killed, not even starting checker.") return } - timer := Timer("TaskTimeoutCheck", TASK_TIMEOUT_CHECK_INTERVAL, false, - self.done_chan, self.done_wg) + timer := Timer("TaskTimeoutCheck", TASK_TIMEOUT_CHECK_INTERVAL, false, &self.closable) for _ = range timer { self.Check(db) @@ -59,9 +54,7 @@ func (self *TaskTimeoutChecker) Go() { } func (self *TaskTimeoutChecker) Close() { - close(self.done_chan) - log.Debug("TaskTimeoutChecker: waiting for shutdown to finish.") - self.done_wg.Wait() + self.closableCloseAndWait() log.Debug("TaskTimeoutChecker: shutdown complete.") } @@ -90,24 +83,24 @@ func (self *TaskTimeoutChecker) Check(db *mgo.Database) { } for _, task := range timedout_tasks { - log.Warningf(" - Task %s (%s) timed out", task.Name, task.Id.Hex()) + log.Warningf(" - Task %s (%s) timed out", task.Name, task.ID.Hex()) var ident string if task.Worker != "" { ident = task.Worker - } else if task.WorkerId != nil { - ident = task.WorkerId.Hex() + } else if task.WorkerID != nil { + ident = task.WorkerID.Hex() } else { ident = "-no worker-" } tupdate := TaskUpdate{ - TaskId: task.Id, + TaskID: task.ID, TaskStatus: "failed", Activity: fmt.Sprintf("Task timed out on worker %s", ident), Log: fmt.Sprintf( "%s Task %s (%s) timed out, was active but untouched since %s. "+ "Was handled by worker %s", - UtcNow().Format(IsoFormat), task.Name, task.Id.Hex(), task.LastWorkerPing, ident), + UtcNow().Format(IsoFormat), task.Name, task.ID.Hex(), task.LastWorkerPing, ident), } QueueTaskUpdate(&tupdate, db) } diff --git a/packages/flamenco-manager-go/src/flamenco-manager/flamenco/task_updates.go b/packages/flamenco-manager-go/src/flamenco-manager/flamenco/task_updates.go index a003ac665b92bf072266ed907fe8cf1785e0a9a2..80b4ce15050c72f5c397f3e654fe26a3605e4fe4 100644 --- a/packages/flamenco-manager-go/src/flamenco-manager/flamenco/task_updates.go +++ b/packages/flamenco-manager-go/src/flamenco-manager/flamenco/task_updates.go @@ -6,7 +6,6 @@ package flamenco import ( "fmt" "net/http" - "sync" "time" log "github.com/Sirupsen/logrus" @@ -20,13 +19,10 @@ const QUEUE_MGO_COLLECTION = "task_update_queue" const TASK_QUEUE_INSPECT_PERIOD = 1 * time.Second type TaskUpdatePusher struct { + closable config *Conf upstream *UpstreamConnection session *mgo.Session - - // For allowing shutdown. - done chan bool - done_wg *sync.WaitGroup } /** @@ -54,7 +50,7 @@ func QueueTaskUpdateFromWorker(w http.ResponseWriter, r *auth.AuthenticatedReque if err := DecodeJson(w, r.Body, &tupdate, fmt.Sprintf("%s QueueTaskUpdate:", worker.Identifier())); err != nil { return } - tupdate.TaskId = task_id + tupdate.TaskID = task_id tupdate.Worker = worker.Identifier() // Check that this worker is allowed to update this task. @@ -66,20 +62,20 @@ func QueueTaskUpdateFromWorker(w http.ResponseWriter, r *auth.AuthenticatedReque fmt.Fprintf(w, "Task %s is unknown.", task_id.Hex()) return } - if task.WorkerId != nil && *task.WorkerId != worker.Id { + if task.WorkerID != nil && *task.WorkerID != worker.ID { log.Warningf("%s QueueTaskUpdateFromWorker: task %s update rejected from %s (%s), task is assigned to %s", - r.RemoteAddr, task_id.Hex(), worker.Id.Hex(), worker.Identifier(), task.WorkerId.Hex()) + r.RemoteAddr, task_id.Hex(), worker.ID.Hex(), worker.Identifier(), task.WorkerID.Hex()) w.WriteHeader(http.StatusConflict) fmt.Fprintf(w, "Task %s is assigned to another worker.", task_id.Hex()) return } - // Only set the task's worker ID if it's not already set to the current worker. - var set_worker_id *bson.ObjectId = nil - if task.WorkerId == nil { - set_worker_id = &worker.Id + // Only set the task's worker.ID if it's not already set to the current worker. + var setWorkerID *bson.ObjectId + if task.WorkerID == nil { + setWorkerID = &worker.ID } - WorkerPingedTask(set_worker_id, tupdate.TaskId, db) + WorkerPingedTask(setWorkerID, tupdate.TaskID, db) if err := QueueTaskUpdate(&tupdate, db); err != nil { log.Warningf("%s: %s", worker.Identifier(), err) @@ -101,7 +97,7 @@ func QueueTaskUpdate(tupdate *TaskUpdate, db *mgo.Database) error { func QueueTaskUpdateWithExtra(tupdate *TaskUpdate, db *mgo.Database, extra_updates bson.M) error { // For ensuring the ordering of updates. time.Time has nanosecond precision. tupdate.ReceivedOnManager = time.Now().UTC() - tupdate.Id = bson.NewObjectId() + tupdate.ID = bson.NewObjectId() // Store the update in the queue for sending to the Flamenco Server later. task_update_queue := db.C(QUEUE_MGO_COLLECTION) @@ -116,27 +112,27 @@ func QueueTaskUpdateWithExtra(tupdate *TaskUpdate, db *mgo.Database, extra_updat updates := extra_updates if tupdate.TaskStatus != "" { // Before blindly applying the task status, first check if the transition is valid. - if TaskStatusTransitionValid(task_coll, tupdate.TaskId, tupdate.TaskStatus) { + if TaskStatusTransitionValid(task_coll, tupdate.TaskID, tupdate.TaskStatus) { updates["status"] = tupdate.TaskStatus } else { log.Warningf("QueueTaskUpdate: not locally applying status=%s for %s", - tupdate.TaskStatus, tupdate.TaskId.Hex()) + tupdate.TaskStatus, tupdate.TaskID.Hex()) } } if tupdate.Activity != "" { updates["activity"] = tupdate.Activity } if len(updates) > 0 { - log.Debugf("QueueTaskUpdate: applying update %s to task %s", updates, tupdate.TaskId.Hex()) - if err := task_coll.UpdateId(tupdate.TaskId, bson.M{"$set": updates}); err != nil { + log.Debugf("QueueTaskUpdate: applying update %s to task %s", updates, tupdate.TaskID.Hex()) + if err := task_coll.UpdateId(tupdate.TaskID, bson.M{"$set": updates}); err != nil { if err != mgo.ErrNotFound { return fmt.Errorf("QueueTaskUpdate: error updating local task cache: %s", err) } else { - log.Warningf("QueueTaskUpdate: cannot find task %s to update locally", tupdate.TaskId.Hex()) + log.Warningf("QueueTaskUpdate: cannot find task %s to update locally", tupdate.TaskID.Hex()) } } } else { - log.Debugf("QueueTaskUpdate: nothing to do locally for task %s", tupdate.TaskId.Hex()) + log.Debugf("QueueTaskUpdate: nothing to do locally for task %s", tupdate.TaskID.Hex()) } return nil @@ -179,11 +175,10 @@ func ValidForCancelRequested(new_status string) bool { func CreateTaskUpdatePusher(config *Conf, upstream *UpstreamConnection, session *mgo.Session) *TaskUpdatePusher { return &TaskUpdatePusher{ + makeClosable(), config, upstream, session, - make(chan bool), - new(sync.WaitGroup), } } @@ -191,15 +186,8 @@ func CreateTaskUpdatePusher(config *Conf, upstream *UpstreamConnection, session * Closes the task update pusher by stopping all timers & goroutines. */ func (self *TaskUpdatePusher) Close() { - close(self.done) - - // Dirty hack: sleep for a bit to ensure the closing of the 'done' - // channel can be handled by other goroutines, before handling the - // closing of the other channels. - time.Sleep(1) - log.Info("TaskUpdatePusher: shutting down, waiting for shutdown to complete.") - self.done_wg.Wait() + self.closableCloseAndWait() log.Info("TaskUpdatePusher: shutdown complete.") } @@ -212,12 +200,14 @@ func (self *TaskUpdatePusher) Go() { db := mongo_sess.DB("") queue := db.C(QUEUE_MGO_COLLECTION) - self.done_wg.Add(1) - defer self.done_wg.Done() + if !self.closableAdd(1) { + return + } + defer self.closableDone() // Investigate the queue periodically. timer_chan := Timer("TaskUpdatePusherTimer", - TASK_QUEUE_INSPECT_PERIOD, false, self.done, self.done_wg) + TASK_QUEUE_INSPECT_PERIOD, false, &self.closable) for _ = range timer_chan { // log.Info("TaskUpdatePusher: checking task update queue") @@ -333,7 +323,7 @@ func (self *TaskUpdatePusher) handle_incoming_cancel_requests(cancel_task_ids [] queue_task_cancel := func(task_id bson.ObjectId) { tupdate := TaskUpdate{ - TaskId: task_id, + TaskID: task_id, TaskStatus: "canceled", } if err := QueueTaskUpdate(&tupdate, db); err != nil { @@ -345,13 +335,13 @@ func (self *TaskUpdatePusher) handle_incoming_cancel_requests(cancel_task_ids [] } for _, task_to_cancel := range tasks_to_cancel { - seen_tasks[task_to_cancel.Id] = true + seen_tasks[task_to_cancel.ID] = true if task_to_cancel.Status == "active" { // This needs to be canceled through the worker, and thus go to cancel-requested. - go_to_cancel_requested = append(go_to_cancel_requested, task_to_cancel.Id) + go_to_cancel_requested = append(go_to_cancel_requested, task_to_cancel.ID) } else { - queue_task_cancel(task_to_cancel.Id) + queue_task_cancel(task_to_cancel.ID) } } @@ -389,7 +379,7 @@ func (self *TaskUpdatePusher) handle_incoming_cancel_requests(cancel_task_ids [] func LogTaskActivity(worker *Worker, task_id bson.ObjectId, activity, log_line string, db *mgo.Database) { tupdate := TaskUpdate{ - TaskId: task_id, + TaskID: task_id, Activity: activity, Log: log_line, } diff --git a/packages/flamenco-manager-go/src/flamenco-manager/flamenco/task_updates_test.go b/packages/flamenco-manager-go/src/flamenco-manager/flamenco/task_updates_test.go index 48b3245d1001c816f6ac7b3d226b9da91f56a51e..6f17811ae58b87a31045da8f17ab7aaf95a38f51 100644 --- a/packages/flamenco-manager-go/src/flamenco-manager/flamenco/task_updates_test.go +++ b/packages/flamenco-manager-go/src/flamenco-manager/flamenco/task_updates_test.go @@ -64,7 +64,7 @@ func (s *TaskUpdatesTestSuite) TestCancelRunningTasks(t *check.C) { log.Info("POST from manager received on server, sending back TaskUpdateResponse.") resp := TaskUpdateResponse{ - CancelTasksIds: []bson.ObjectId{task2.Id}, + CancelTasksIds: []bson.ObjectId{task2.ID}, } return httpmock.NewJsonResponse(200, &resp) }, @@ -90,9 +90,9 @@ func (s *TaskUpdatesTestSuite) TestCancelRunningTasks(t *check.C) { // Check that one task was canceled and the other was not. task_db := Task{} - assert.Nil(t, tasks_coll.FindId(task1.Id).One(&task_db)) + assert.Nil(t, tasks_coll.FindId(task1.ID).One(&task_db)) assert.Equal(t, "queued", task_db.Status) - assert.Nil(t, tasks_coll.FindId(task2.Id).One(&task_db)) + assert.Nil(t, tasks_coll.FindId(task2.ID).One(&task_db)) assert.Equal(t, "canceled", task_db.Status) } @@ -114,33 +114,33 @@ func (s *TaskUpdatesTestSuite) TestMultipleWorkersForOneTask(c *check.C) { assert.Nil(c, StoreNewWorker(&worker2, s.db)) // Task should not be assigned to any worker - assert.Nil(c, task1.WorkerId) + assert.Nil(c, task1.WorkerID) tupdate := TaskUpdate{ - TaskId: task1.Id, + TaskID: task1.ID, Activity: "doing stuff by worker1", } payload_bytes, err := json.Marshal(tupdate) assert.Nil(c, err) - resp_rec, ar := WorkerTestRequestWithBody(worker1.Id, bytes.NewBuffer(payload_bytes), "POST", "/tasks/1aaaaaaaaaaaaaaaaaaaaaaa/update") - QueueTaskUpdateFromWorker(resp_rec, ar, s.db, task1.Id) + resp_rec, ar := WorkerTestRequestWithBody(worker1.ID, bytes.NewBuffer(payload_bytes), "POST", "/tasks/1aaaaaaaaaaaaaaaaaaaaaaa/update") + QueueTaskUpdateFromWorker(resp_rec, ar, s.db, task1.ID) assert.Equal(c, 204, resp_rec.Code) // Because of this update, the task should be assigned to worker 1 - assert.Nil(c, tasks_coll.FindId(task1.Id).One(&task1)) - assert.Equal(c, task1.WorkerId, task1.WorkerId) + assert.Nil(c, tasks_coll.FindId(task1.ID).One(&task1)) + assert.Equal(c, task1.WorkerID, task1.WorkerID) assert.Equal(c, task1.Activity, "doing stuff by worker1") // An update by worker 2 should fail. tupdate.Activity = "doing stuff by worker2" payload_bytes, err = json.Marshal(tupdate) assert.Nil(c, err) - resp_rec, ar = WorkerTestRequestWithBody(worker2.Id, bytes.NewBuffer(payload_bytes), "POST", "/tasks/1aaaaaaaaaaaaaaaaaaaaaaa/update") - QueueTaskUpdateFromWorker(resp_rec, ar, s.db, task1.Id) + resp_rec, ar = WorkerTestRequestWithBody(worker2.ID, bytes.NewBuffer(payload_bytes), "POST", "/tasks/1aaaaaaaaaaaaaaaaaaaaaaa/update") + QueueTaskUpdateFromWorker(resp_rec, ar, s.db, task1.ID) assert.Equal(c, http.StatusConflict, resp_rec.Code) // The task should still be assigned to worker 1 - assert.Nil(c, tasks_coll.FindId(task1.Id).One(&task1)) - assert.Equal(c, task1.WorkerId, task1.WorkerId) + assert.Nil(c, tasks_coll.FindId(task1.ID).One(&task1)) + assert.Equal(c, task1.WorkerID, task1.WorkerID) assert.Equal(c, task1.Activity, "doing stuff by worker1") } diff --git a/packages/flamenco-manager-go/src/flamenco-manager/flamenco/timer.go b/packages/flamenco-manager-go/src/flamenco-manager/flamenco/timer.go index 467ded9e2496bd884462b239b0c0358ff264bcfe..f07f47c5a04795029d077268c3c59f583e71ba50 100644 --- a/packages/flamenco-manager-go/src/flamenco-manager/flamenco/timer.go +++ b/packages/flamenco-manager-go/src/flamenco-manager/flamenco/timer.go @@ -1,7 +1,6 @@ package flamenco import ( - "sync" "time" log "github.com/Sirupsen/logrus" @@ -14,13 +13,15 @@ type TimerPing struct{} * * :param sleep_first: if true: sleep first, then ping. If false: ping first, then sleep. */ -func Timer(name string, sleep_duration time.Duration, sleep_first bool, - done_chan <-chan bool, done_wg *sync.WaitGroup) <-chan TimerPing { +func Timer(name string, sleep_duration time.Duration, sleep_first bool, closable *closable) <-chan TimerPing { timer_chan := make(chan TimerPing, 1) // don't let the timer block go func() { - done_wg.Add(1) - defer done_wg.Done() + if !closable.closableAdd(1) { + log.Infof("Timer '%s' goroutine shutting down.", name) + return + } + defer closable.closableDone() defer close(timer_chan) last_timer := time.Time{} @@ -30,7 +31,7 @@ func Timer(name string, sleep_duration time.Duration, sleep_first bool, for { select { - case <-done_chan: + case <-closable.doneChan: log.Infof("Timer '%s' goroutine shutting down.", name) return default: @@ -55,17 +56,18 @@ func Timer(name string, sleep_duration time.Duration, sleep_first bool, * * :returns: "ok", so true when the sleep stopped normally, and false if it was killed. */ -func KillableSleep(name string, sleep_duration time.Duration, - done_chan <-chan bool, done_wg *sync.WaitGroup) bool { +func KillableSleep(name string, sleep_duration time.Duration, closable *closable) bool { - done_wg.Add(1) - defer done_wg.Done() + if !closable.closableAdd(1) { + return false + } + defer closable.closableDone() defer log.Infof("Sleep '%s' goroutine is shut down.", name) sleep_start := time.Now() for { select { - case <-done_chan: + case <-closable.doneChan: log.Infof("Sleep '%s' goroutine shutting down.", name) return false default: @@ -88,7 +90,7 @@ func UtcNow() *time.Time { return &now } -/* Sends a 'true' to the channel after the given timeout. +/* TimeoutAfter: Sends a 'true' to the channel after the given timeout. * Send a 'false' to the channel yourself if you want to notify the receiver that * a timeout didn't happen. * diff --git a/packages/flamenco-manager-go/src/flamenco-manager/flamenco/upstream.go b/packages/flamenco-manager-go/src/flamenco-manager/flamenco/upstream.go index de64b5904c4538a9309db5cb7dd8d98fe53908c2..9167dc6674c5034af9f220bef56bef252c2d786d 100644 --- a/packages/flamenco-manager-go/src/flamenco-manager/flamenco/upstream.go +++ b/packages/flamenco-manager-go/src/flamenco-manager/flamenco/upstream.go @@ -9,7 +9,6 @@ import ( "io/ioutil" "net/http" "net/url" - "sync" "time" log "github.com/Sirupsen/logrus" @@ -24,23 +23,20 @@ const STARTUP_NOTIFICATION_INITIAL_DELAY = 500 * time.Millisecond const STARTUP_NOTIFICATION_RETRY = 30 * time.Second type UpstreamConnection struct { + closable config *Conf session *mgo.Session // Send any boolean here to kick the task downloader into downloading new tasks. download_kick chan chan bool - - done chan bool - done_wg *sync.WaitGroup } func ConnectUpstream(config *Conf, session *mgo.Session) *UpstreamConnection { upconn := UpstreamConnection{ + makeClosable(), config, session, make(chan chan bool), - make(chan bool), - new(sync.WaitGroup), } upconn.download_task_loop() @@ -51,16 +47,9 @@ func ConnectUpstream(config *Conf, session *mgo.Session) *UpstreamConnection { * Closes the upstream connection by stopping all upload/download loops. */ func (self *UpstreamConnection) Close() { - close(self.done) - - // Dirty hack: sleep for a bit to ensure the closing of the 'done' - // channel can be handled by other goroutines, before handling the - // closing of the other channels. - time.Sleep(1) - close(self.download_kick) - log.Debugf("UpstreamConnection: shutting down, waiting for shutdown to complete.") - self.done_wg.Wait() + close(self.download_kick) // TODO: maybe move this between closing of done channel and waiting + self.closableCloseAndWait() log.Info("UpstreamConnection: shutdown complete.") } @@ -71,15 +60,18 @@ func (self *UpstreamConnection) KickDownloader(synchronous bool) { log.Info("KickDownloader: Waiting for task downloader to finish.") // wait for the download to be complete, or the connection to be shut down. - self.done_wg.Add(1) - defer self.done_wg.Done() + if !self.closableAdd(1) { + log.Debugf("KickDownloader: Aborting waiting for task downloader; shutting down.") + return + } + defer self.closableDone() for { select { case <-pingback: log.Debugf("KickDownloader: done.") return - case <-self.done: + case <-self.doneChan: log.Debugf("KickDownloader: Aborting waiting for task downloader; shutting down.") return } @@ -94,21 +86,22 @@ func (self *UpstreamConnection) download_task_loop() { timer_chan := Timer("download_task_loop", self.config.DownloadTaskSleep, false, - self.done, - self.done_wg, + &self.closable, ) go func() { mongo_sess := self.session.Copy() defer mongo_sess.Close() - self.done_wg.Add(1) - defer self.done_wg.Done() + if !self.closableAdd(1) { + return + } + defer self.closableDone() defer log.Info("download_task_loop: Task download goroutine shutting down.") for { select { - case <-self.done: + case <-self.doneChan: return case _, ok := <-timer_chan: if !ok { @@ -205,17 +198,17 @@ func download_tasks_from_upstream(config *Conf, mongo_sess *mgo.Session) { } tasks_coll := db.C("flamenco_tasks") for _, task := range depsgraph { - change, err := tasks_coll.Upsert(bson.M{"_id": task.Id}, task) + change, err := tasks_coll.Upsert(bson.M{"_id": task.ID}, task) if err != nil { - log.Errorf("unable to insert new task %s: %s", task.Id.Hex(), err) + log.Errorf("unable to insert new task %s: %s", task.ID.Hex(), err) continue } if change.Updated > 0 { - log.Debug("Upstream server re-queued existing task ", task.Id.Hex()) + log.Debug("Upstream server re-queued existing task ", task.ID.Hex()) } else if change.Matched > 0 { log.Debugf("Upstream server re-queued existing task %s, but nothing changed", - task.Id.Hex()) + task.ID.Hex()) } } @@ -256,7 +249,7 @@ func (self *UpstreamConnection) SendJson(logprefix, method string, url *url.URL, func (self *UpstreamConnection) SendStartupNotification() { notification := StartupNotification{ - ManagerUrl: self.config.OwnUrl, + ManagerURL: self.config.OwnUrl, VariablesByVarname: self.config.VariablesByVarname, NumberOfWorkers: 0, } @@ -282,20 +275,23 @@ func (self *UpstreamConnection) SendStartupNotification() { go func() { // Register as a loop that responds to 'done' being closed. - self.done_wg.Add(1) - defer self.done_wg.Done() + if !self.closableAdd(1) { + log.Warning("SendStartupNotification: shutting down early without sending startup notification.") + return + } + defer self.closableDone() mongo_sess := self.session.Copy() defer mongo_sess.Close() ok := KillableSleep("SendStartupNotification-initial", STARTUP_NOTIFICATION_INITIAL_DELAY, - self.done, self.done_wg) + &self.closable) if !ok { log.Warning("SendStartupNotification: shutting down without sending startup notification.") return } timer_chan := Timer("SendStartupNotification", STARTUP_NOTIFICATION_RETRY, - false, self.done, self.done_wg) + false, &self.closable) for _ = range timer_chan { log.Info("SendStartupNotification: trying to send notification.") @@ -345,7 +341,7 @@ func (self *UpstreamConnection) SendTaskUpdates(updates *[]TaskUpdate) (*TaskUpd * If it was changed or removed, this function return true. */ func (self *UpstreamConnection) RefetchTask(task *Task) bool { - get_url, err := self.ResolveUrl("/api/flamenco/tasks/%s", task.Id.Hex()) + get_url, err := self.ResolveUrl("/api/flamenco/tasks/%s", task.ID.Hex()) log.Infof("Verifying task with Flamenco Server %s", get_url) req, err := http.NewRequest("GET", get_url.String(), nil) @@ -365,20 +361,20 @@ func (self *UpstreamConnection) RefetchTask(task *Task) bool { if resp.StatusCode == http.StatusNotModified { // Nothing changed, we're good to go. - log.Infof("Cached task %s is still the same on the Server", task.Id.Hex()) + log.Infof("Cached task %s is still the same on the Server", task.ID.Hex()) return false } if resp.StatusCode >= 500 { // Internal errors, we'll ignore that. log.Warningf("Error %d trying to re-fetch task %s", - resp.StatusCode, task.Id.Hex()) + resp.StatusCode, task.ID.Hex()) return false } if 300 <= resp.StatusCode && resp.StatusCode < 400 { // Redirects, we'll ignore those too for now. log.Warningf("Redirect %d trying to re-fetch task %s, not following redirect.", - resp.StatusCode, task.Id.Hex()) + resp.StatusCode, task.ID.Hex()) return false } @@ -390,7 +386,7 @@ func (self *UpstreamConnection) RefetchTask(task *Task) bool { // Not found, access denied, that kind of stuff. Locally cancel the task. // TODO: probably better to go to "failed". log.Warningf("Code %d when re-fetching task %s; canceling local copy", - resp.StatusCode, task.Id.Hex()) + resp.StatusCode, task.ID.Hex()) new_task = *task new_task.Status = "canceled" @@ -411,9 +407,9 @@ func (self *UpstreamConnection) RefetchTask(task *Task) bool { // save the task to the queue. log.Infof("Cached task %s was changed on the Server, status=%s, priority=%d.", - task.Id.Hex(), new_task.Status, new_task.Priority) + task.ID.Hex(), new_task.Status, new_task.Priority) tasks_coll := self.session.DB("").C("flamenco_tasks") - tasks_coll.UpdateId(task.Id, + tasks_coll.UpdateId(task.ID, bson.M{"$set": new_task}) return true diff --git a/packages/flamenco-manager-go/src/flamenco-manager/flamenco/workers.go b/packages/flamenco-manager-go/src/flamenco-manager/flamenco/workers.go index 68c4be5e9c5be43519b93745b780df95f0e0a998..2082c1290b8c87fb6a37a3e94c8f35e1646a18cd 100644 --- a/packages/flamenco-manager-go/src/flamenco-manager/flamenco/workers.go +++ b/packages/flamenco-manager-go/src/flamenco-manager/flamenco/workers.go @@ -67,7 +67,7 @@ func StoreNewWorker(winfo *Worker, db *mgo.Database) error { var err error // Store it in MongoDB after hashing the password and assigning an ID. - winfo.Id = bson.NewObjectId() + winfo.ID = bson.NewObjectId() winfo.HashedSecret, err = bcrypt.GenerateFromPassword([]byte(winfo.Secret), bcrypt.DefaultCost) if err != nil { log.Errorf("Unable to hash password:", err) @@ -146,19 +146,19 @@ func WorkerMayRunTask(w http.ResponseWriter, r *auth.AuthenticatedRequest, task := Task{} if err := db.C("flamenco_tasks").FindId(task_id).One(&task); err != nil { log.Warningf("%s WorkerMayRunTask: unable to find task %s for worker %s", - r.RemoteAddr, task_id.Hex(), worker.Id.Hex()) + r.RemoteAddr, task_id.Hex(), worker.ID.Hex()) response.Reason = fmt.Sprintf("unable to find task %s", task_id.Hex()) - } else if task.WorkerId != nil && *task.WorkerId != worker.Id { + } else if task.WorkerID != nil && *task.WorkerID != worker.ID { log.Warningf("%s WorkerMayRunTask: task %s was assigned from worker %s to %s", - r.RemoteAddr, task_id.Hex(), worker.Id.Hex(), task.WorkerId.Hex()) + r.RemoteAddr, task_id.Hex(), worker.ID.Hex(), task.WorkerID.Hex()) response.Reason = fmt.Sprintf("task %s reassigned to another worker", task_id.Hex()) } else if !IsRunnableTaskStatus(task.Status) { log.Warningf("%s WorkerMayRunTask: task %s is in not-runnable status %s, worker %s will stop", - r.RemoteAddr, task_id.Hex(), task.Status, worker.Id.Hex()) + r.RemoteAddr, task_id.Hex(), task.Status, worker.ID.Hex()) response.Reason = fmt.Sprintf("task %s in non-runnable status %s", task_id.Hex(), task.Status) } else { response.MayKeepRunning = true - WorkerPingedTask(&worker.Id, task_id, db) + WorkerPingedTask(&worker.ID, task_id, db) } // Send the response @@ -214,8 +214,8 @@ func WorkerSeen(worker *Worker, remote_addr string, db *mgo.Database) { updates["address"] = remote_addr } - if err := db.C("flamenco_workers").UpdateId(worker.Id, bson.M{"$set": updates}); err != nil { - log.Errorf("WorkerSeen: unable to update worker %s in MongoDB: %s", worker.Id, err) + if err := db.C("flamenco_workers").UpdateId(worker.ID, bson.M{"$set": updates}); err != nil { + log.Errorf("WorkerSeen: unable to update worker %s in MongoDB: %s", worker.ID, err) } } @@ -237,7 +237,7 @@ func WorkerSignOff(w http.ResponseWriter, r *auth.AuthenticatedRequest, db *mgo. // Update the tasks assigned to the worker. var tasks []Task query := bson.M{ - "worker_id": worker.Id, + "worker_id": worker.ID, "status": "active", } sent_header := false @@ -256,15 +256,15 @@ func WorkerSignOff(w http.ResponseWriter, r *auth.AuthenticatedRequest, db *mgo. } for _, task := range tasks { - tupdate.TaskId = task.Id + tupdate.TaskID = task.ID if err := QueueTaskUpdate(&tupdate, db); err != nil { if !sent_header { w.WriteHeader(http.StatusInternalServerError) sent_header = true } - fmt.Fprintf(w, "Error updating task %s: %s\n", task.Id.Hex(), err) + fmt.Fprintf(w, "Error updating task %s: %s\n", task.ID.Hex(), err) log.Errorf("WorkerSignOff: unable to update task %s for worker %s in MongoDB: %s", - task.Id.Hex(), w_ident, err) + task.ID.Hex(), w_ident, err) } } } @@ -274,7 +274,7 @@ func WorkerSignOff(w http.ResponseWriter, r *auth.AuthenticatedRequest, db *mgo. updates := bson.M{ "status": worker.Status, } - if err := db.C("flamenco_workers").UpdateId(worker.Id, bson.M{"$set": updates}); err != nil { + if err := db.C("flamenco_workers").UpdateId(worker.ID, bson.M{"$set": updates}); err != nil { if !sent_header { w.WriteHeader(http.StatusInternalServerError) } diff --git a/packages/flamenco-manager-go/src/flamenco-manager/flamenco/workers_test.go b/packages/flamenco-manager-go/src/flamenco-manager/flamenco/workers_test.go index aceae1872c03f1624d039e542cc01a98005e1bf9..d49c84e725d672daebd521783e3f2d16bcb6394d 100644 --- a/packages/flamenco-manager-go/src/flamenco-manager/flamenco/workers_test.go +++ b/packages/flamenco-manager-go/src/flamenco-manager/flamenco/workers_test.go @@ -46,12 +46,12 @@ func (s *SchedulerTestSuite) TestWorkerMayRun(t *check.C) { } // Make sure the scheduler gives us this task. - resp_rec, ar := WorkerTestRequest(s.worker_lnx.Id, "GET", "/task") + resp_rec, ar := WorkerTestRequest(s.worker_lnx.ID, "GET", "/task") s.sched.ScheduleTask(resp_rec, ar) // Right after obtaining the task, we should be allowed to keep running it. - resp_rec, ar = WorkerTestRequest(s.worker_lnx.Id, "GET", "/may-i-run/%s", task.Id.Hex()) - WorkerMayRunTask(resp_rec, ar, s.db, task.Id) + resp_rec, ar = WorkerTestRequest(s.worker_lnx.ID, "GET", "/may-i-run/%s", task.ID.Hex()) + WorkerMayRunTask(resp_rec, ar, s.db, task.ID) resp := MayKeepRunningResponse{} parseJson(t, resp_rec, 200, &resp) @@ -59,22 +59,22 @@ func (s *SchedulerTestSuite) TestWorkerMayRun(t *check.C) { assert.Equal(t, true, resp.MayKeepRunning) // If we now change the task status to "cancel-requested", the worker should be denied. - assert.Nil(t, s.db.C("flamenco_tasks").UpdateId(task.Id, + assert.Nil(t, s.db.C("flamenco_tasks").UpdateId(task.ID, bson.M{"$set": bson.M{"status": "cancel-requested"}})) - resp_rec, ar = WorkerTestRequest(s.worker_lnx.Id, "GET", "/may-i-run/%s", task.Id.Hex()) - WorkerMayRunTask(resp_rec, ar, s.db, task.Id) + resp_rec, ar = WorkerTestRequest(s.worker_lnx.ID, "GET", "/may-i-run/%s", task.ID.Hex()) + WorkerMayRunTask(resp_rec, ar, s.db, task.ID) resp = MayKeepRunningResponse{} parseJson(t, resp_rec, 200, &resp) assert.Equal(t, false, resp.MayKeepRunning) // Changing status back to "active", but assigning to another worker - assert.Nil(t, s.db.C("flamenco_tasks").UpdateId(task.Id, bson.M{"$set": bson.M{ + assert.Nil(t, s.db.C("flamenco_tasks").UpdateId(task.ID, bson.M{"$set": bson.M{ "status": "active", - "worker_id": s.worker_win.Id, + "worker_id": s.worker_win.ID, }})) - resp_rec, ar = WorkerTestRequest(s.worker_lnx.Id, "GET", "/may-i-run/%s", task.Id.Hex()) - WorkerMayRunTask(resp_rec, ar, s.db, task.Id) + resp_rec, ar = WorkerTestRequest(s.worker_lnx.ID, "GET", "/may-i-run/%s", task.ID.Hex()) + WorkerMayRunTask(resp_rec, ar, s.db, task.ID) resp = MayKeepRunningResponse{} parseJson(t, resp_rec, 200, &resp) diff --git a/packages/flamenco-manager-go/src/flamenco-manager/main.go b/packages/flamenco-manager-go/src/flamenco-manager/main.go index 9e08ac817e270136727304c82145e3ef74f46d77..47a4fdf008d547cbdbbd7270b7e7a2e1ef3c7181 100644 --- a/packages/flamenco-manager-go/src/flamenco-manager/main.go +++ b/packages/flamenco-manager-go/src/flamenco-manager/main.go @@ -1,6 +1,7 @@ package main import ( + "context" "flag" "fmt" "net/http" @@ -30,6 +31,8 @@ var upstream *flamenco.UpstreamConnection var task_scheduler *flamenco.TaskScheduler var task_update_pusher *flamenco.TaskUpdatePusher var task_timeout_checker *flamenco.TaskTimeoutChecker +var httpServer *http.Server +var shutdownComplete chan struct{} func http_status(w http.ResponseWriter, r *http.Request) { flamenco.SendStatusReport(w, r, session, FLAMENCO_VERSION) @@ -113,18 +116,28 @@ func shutdown(signum os.Signal) { go func() { log.Infof("Signal '%s' received, shutting down.", signum) + + if httpServer != nil { + log.Info("Shutting down HTTP server") + httpServer.Shutdown(context.Background()) + } else { + log.Warning("HTTP server was not even started yet") + } + task_timeout_checker.Close() task_update_pusher.Close() upstream.Close() session.Close() - log.Warning("Shutdown complete, stopping process.") timeout <- false }() if <-timeout { - log.Warning("Shutdown forced, stopping process.") + log.Error("Shutdown forced, stopping process.") + os.Exit(-2) } - os.Exit(-2) + + log.Warning("Shutdown complete, stopping process.") + close(shutdownComplete) } var cliArgs struct { @@ -223,6 +236,12 @@ func main() { go task_update_pusher.Go() go task_timeout_checker.Go() + // Create the HTTP server before allowing the shutdown signal Handler + // to exist. This prevents a race condition when Ctrl+C is pressed after + // the http.Server is created, but before it is assigned to httpServer. + httpServer = &http.Server{Addr: config.Listen, Handler: router} + shutdownComplete = make(chan struct{}) + // Handle Ctrl+C c := make(chan os.Signal, 1) signal.Notify(c, os.Interrupt) @@ -236,12 +255,10 @@ func main() { // Fall back to insecure server if TLS certificate/key is not defined. if !has_tls { - log.Fatal(http.ListenAndServe(config.Listen, router)) + log.Warning(httpServer.ListenAndServe()) } else { - log.Fatal(http.ListenAndServeTLS( - config.Listen, - config.TLSCert, - config.TLSKey, - router)) + log.Warning(httpServer.ListenAndServeTLS(config.TLSCert, config.TLSKey)) } + + <-shutdownComplete }