From a5b0efc7d6abcd1e3dd5b73db31984a2c84b6800 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sybren=20A=2E=20St=C3=BCvel?= <sybren@stuvel.eu> Date: Fri, 17 Feb 2017 13:00:24 +0100 Subject: [PATCH] Manager: xxxId -> xxxID and xxxUrl -> xxxURL and unexported 1 struct --- .../flamenco-manager/flamenco/documents.go | 14 ++--- .../flamenco-manager/flamenco/gocheck_test.go | 2 +- .../flamenco-manager/flamenco/scheduler.go | 14 ++--- .../flamenco/scheduler_test.go | 60 +++++++++---------- .../flamenco/task_timeout_check.go | 10 ++-- .../flamenco-manager/flamenco/task_updates.go | 40 ++++++------- .../flamenco/task_updates_test.go | 26 ++++---- .../src/flamenco-manager/flamenco/upstream.go | 24 ++++---- .../src/flamenco-manager/flamenco/workers.go | 26 ++++---- .../flamenco-manager/flamenco/workers_test.go | 20 +++---- 10 files changed, 118 insertions(+), 118 deletions(-) diff --git a/packages/flamenco-manager-go/src/flamenco-manager/flamenco/documents.go b/packages/flamenco-manager-go/src/flamenco-manager/flamenco/documents.go index 9b4e244c..1b20f8e2 100644 --- a/packages/flamenco-manager-go/src/flamenco-manager/flamenco/documents.go +++ b/packages/flamenco-manager-go/src/flamenco-manager/flamenco/documents.go @@ -12,7 +12,7 @@ type Command struct { } type Task struct { - Id bson.ObjectId `bson:"_id,omitempty" json:"_id,omitempty"` + ID bson.ObjectId `bson:"_id,omitempty" json:"_id,omitempty"` Etag string `bson:"_etag,omitempty" json:"_etag,omitempty"` Job bson.ObjectId `bson:"job,omitempty" json:"job"` Manager bson.ObjectId `bson:"manager,omitempty" json:"manager"` @@ -30,11 +30,11 @@ type Task struct { Worker string `bson:"worker,omitempty" json:"worker,omitempty"` // Internal bookkeeping - WorkerId *bson.ObjectId `bson:"worker_id,omitempty" json:"-"` + WorkerID *bson.ObjectId `bson:"worker_id,omitempty" json:"-"` LastWorkerPing *time.Time `bson:"last_worker_ping,omitempty" json:"-"` } -type AggregationPipelineResult struct { +type aggregationPipelineResult struct { Task *Task `bson:"task"` } @@ -45,8 +45,8 @@ type ScheduledTasks struct { // Both sent from Worker to Manager, as well as from Manager to Server. type TaskUpdate struct { - Id bson.ObjectId `bson:"_id" json:"_id"` - TaskId bson.ObjectId `bson:"task_id" json:"task_id,omitempty"` + ID bson.ObjectId `bson:"_id" json:"_id"` + TaskID bson.ObjectId `bson:"task_id" json:"task_id,omitempty"` TaskStatus string `bson:"task_status,omitempty" json:"task_status,omitempty"` ReceivedOnManager time.Time `bson:"received_on_manager" json:"received_on_manager"` Activity string `bson:"activity,omitempty" json:"activity,omitempty"` @@ -72,7 +72,7 @@ type WorkerRegistration struct { } type Worker struct { - Id bson.ObjectId `bson:"_id,omitempty" json:"_id,omitempty"` + ID bson.ObjectId `bson:"_id,omitempty" json:"_id,omitempty"` Secret string `bson:"-" json:"-"` HashedSecret []byte `bson:"hashed_secret" json:"-"` Nickname string `bson:"nickname" json:"nickname"` @@ -91,7 +91,7 @@ type Worker struct { */ type StartupNotification struct { // Settings - ManagerUrl string `json:"manager_url"` + ManagerURL string `json:"manager_url"` VariablesByVarname map[string]map[string]string `json:"variables"` // From our local database diff --git a/packages/flamenco-manager-go/src/flamenco-manager/flamenco/gocheck_test.go b/packages/flamenco-manager-go/src/flamenco-manager/flamenco/gocheck_test.go index 83936c53..08cd2b88 100644 --- a/packages/flamenco-manager-go/src/flamenco-manager/flamenco/gocheck_test.go +++ b/packages/flamenco-manager-go/src/flamenco-manager/flamenco/gocheck_test.go @@ -20,7 +20,7 @@ func ConstructTestTask(task_id, job_type string) Task { func ConstructTestTaskWithPrio(task_id, job_type string, priority int) Task { return Task{ - Id: bson.ObjectIdHex(task_id), + ID: bson.ObjectIdHex(task_id), Etag: "1234567", Job: bson.ObjectIdHex("bbbbbbbbbbbbbbbbbbbbbbbb"), Manager: bson.ObjectIdHex("cccccccccccccccccccccccc"), diff --git a/packages/flamenco-manager-go/src/flamenco-manager/flamenco/scheduler.go b/packages/flamenco-manager-go/src/flamenco-manager/flamenco/scheduler.go index 23540871..7fce5d82 100644 --- a/packages/flamenco-manager-go/src/flamenco-manager/flamenco/scheduler.go +++ b/packages/flamenco-manager-go/src/flamenco-manager/flamenco/scheduler.go @@ -64,7 +64,7 @@ func (ts *TaskScheduler) ScheduleTask(w http.ResponseWriter, r *auth.Authenticat break } - log.Debugf("Task %s was changed, reexamining queue.", task.Id.Hex()) + log.Debugf("Task %s was changed, reexamining queue.", task.ID.Hex()) } if was_changed { log.Errorf("Infinite loop detected, tried 1000 tasks and they all changed...") @@ -74,14 +74,14 @@ func (ts *TaskScheduler) ScheduleTask(w http.ResponseWriter, r *auth.Authenticat // Update the task status to "active", pushing it as a task update to the manager too. task.Status = "active" - tupdate := TaskUpdate{TaskId: task.Id, TaskStatus: task.Status} + tupdate := TaskUpdate{TaskID: task.ID, TaskStatus: task.Status} local_updates := bson.M{ - "worker_id": worker.Id, + "worker_id": worker.ID, "last_worker_ping": UtcNow(), } if err := QueueTaskUpdateWithExtra(&tupdate, db, local_updates); err != nil { log.Errorf("Unable to queue task update while assigning task %s to worker %s: %s", - task.Id.Hex(), worker.Identifier(), err) + task.ID.Hex(), worker.Identifier(), err) w.WriteHeader(http.StatusInternalServerError) return } @@ -95,12 +95,12 @@ func (ts *TaskScheduler) ScheduleTask(w http.ResponseWriter, r *auth.Authenticat encoder.Encode(task) log.Infof("ScheduleTask: assigned task %s to worker %s", - task.Id.Hex(), worker.Identifier()) + task.ID.Hex(), worker.Identifier()) // Push a task log line stating we've assigned this task to the given worker. // This is done here, instead of by the worker, so that it's logged even if the worker fails. msg := fmt.Sprintf("Manager assigned task to worker %s", worker.Identifier()) - LogTaskActivity(worker, task.Id, msg, time.Now().Format(IsoFormat)+": "+msg, db) + LogTaskActivity(worker, task.ID, msg, time.Now().Format(IsoFormat)+": "+msg, db) } /** @@ -116,7 +116,7 @@ func (ts *TaskScheduler) fetchTaskFromQueueOrManager( return nil } - result := AggregationPipelineResult{} + result := aggregationPipelineResult{} tasks_coll := db.C("flamenco_tasks") pipe := tasks_coll.Pipe([]M{ diff --git a/packages/flamenco-manager-go/src/flamenco-manager/flamenco/scheduler_test.go b/packages/flamenco-manager-go/src/flamenco-manager/flamenco/scheduler_test.go index cc819296..c33c9f84 100644 --- a/packages/flamenco-manager-go/src/flamenco-manager/flamenco/scheduler_test.go +++ b/packages/flamenco-manager-go/src/flamenco-manager/flamenco/scheduler_test.go @@ -94,7 +94,7 @@ func (s *SchedulerTestSuite) TestVariableReplacement(t *check.C) { // Perform HTTP request resp_rec := httptest.NewRecorder() request, _ := http.NewRequest("GET", "/task", nil) - ar := &auth.AuthenticatedRequest{Request: *request, Username: s.worker_lnx.Id.Hex()} + ar := &auth.AuthenticatedRequest{Request: *request, Username: s.worker_lnx.ID.Hex()} s.sched.ScheduleTask(resp_rec, ar) // Check the response JSON @@ -106,7 +106,7 @@ func (s *SchedulerTestSuite) TestVariableReplacement(t *check.C) { json_task.Commands[0].Settings["message"]) // Check worker with other job type - ar = &auth.AuthenticatedRequest{Request: *request, Username: s.worker_win.Id.Hex()} + ar = &auth.AuthenticatedRequest{Request: *request, Username: s.worker_win.ID.Hex()} s.sched.ScheduleTask(resp_rec, ar) // Check the response JSON @@ -132,13 +132,13 @@ func (s *SchedulerTestSuite) TestSchedulerOrderByPriority(t *check.C) { // Perform HTTP request to the scheduler. resp_rec := httptest.NewRecorder() request, _ := http.NewRequest("GET", "/task", nil) - ar := &auth.AuthenticatedRequest{Request: *request, Username: s.worker_lnx.Id.Hex()} + ar := &auth.AuthenticatedRequest{Request: *request, Username: s.worker_lnx.ID.Hex()} s.sched.ScheduleTask(resp_rec, ar) // We should have gotten task 2, because it has the highest priority. json_task := Task{} parseJson(t, resp_rec, 200, &json_task) - assert.Equal(t, task2.Id.Hex(), json_task.Id.Hex()) + assert.Equal(t, task2.ID.Hex(), json_task.ID.Hex()) } func (s *SchedulerTestSuite) TestSchedulerOrderByJobPriority(t *check.C) { @@ -157,13 +157,13 @@ func (s *SchedulerTestSuite) TestSchedulerOrderByJobPriority(t *check.C) { // Perform HTTP request to the scheduler. resp_rec := httptest.NewRecorder() request, _ := http.NewRequest("GET", "/task", nil) - ar := &auth.AuthenticatedRequest{Request: *request, Username: s.worker_lnx.Id.Hex()} + ar := &auth.AuthenticatedRequest{Request: *request, Username: s.worker_lnx.ID.Hex()} s.sched.ScheduleTask(resp_rec, ar) // We should have gotten task 1, because its job has the highest priority. json_task := Task{} parseJson(t, resp_rec, 200, &json_task) - assert.Equal(t, task1.Id.Hex(), json_task.Id.Hex()) + assert.Equal(t, task1.ID.Hex(), json_task.ID.Hex()) } /** @@ -204,7 +204,7 @@ func (s *SchedulerTestSuite) TestSchedulerVerifyUpstreamCanceled(t *check.C) { // Perform HTTP request to the scheduler. resp_rec := httptest.NewRecorder() request, _ := http.NewRequest("GET", "/task", nil) - ar := &auth.AuthenticatedRequest{Request: *request, Username: s.worker_lnx.Id.Hex()} + ar := &auth.AuthenticatedRequest{Request: *request, Username: s.worker_lnx.ID.Hex()} s.sched.ScheduleTask(resp_rec, ar) timedout := <-timeout @@ -215,11 +215,11 @@ func (s *SchedulerTestSuite) TestSchedulerVerifyUpstreamCanceled(t *check.C) { parseJson(t, resp_rec, 200, &json_task) // We should have gotten task 1, because task 2 was canceled. - assert.Equal(t, task1.Id.Hex(), json_task.Id.Hex()) + assert.Equal(t, task1.ID.Hex(), json_task.ID.Hex()) // In our queue, task 2 should have been canceled, since it was canceled on the server. found_task2 := Task{} - err := s.db.C("flamenco_tasks").FindId(task2.Id).One(&found_task2) + err := s.db.C("flamenco_tasks").FindId(task2.ID).One(&found_task2) assert.Equal(t, nil, err) assert.Equal(t, "canceled", found_task2.Status) } @@ -256,7 +256,7 @@ func (s *SchedulerTestSuite) TestSchedulerVerifyUpstreamPrioChange(t *check.C) { // Perform HTTP request to the scheduler. resp_rec := httptest.NewRecorder() request, _ := http.NewRequest("GET", "/task", nil) - ar := &auth.AuthenticatedRequest{Request: *request, Username: s.worker_lnx.Id.Hex()} + ar := &auth.AuthenticatedRequest{Request: *request, Username: s.worker_lnx.ID.Hex()} s.sched.ScheduleTask(resp_rec, ar) timedout := <-timeout @@ -267,16 +267,16 @@ func (s *SchedulerTestSuite) TestSchedulerVerifyUpstreamPrioChange(t *check.C) { parseJson(t, resp_rec, 200, &json_task) // We should have gotten task 1, because task 2 was lowered in prio. - assert.Equal(t, task1.Id.Hex(), json_task.Id.Hex()) + assert.Equal(t, task1.ID.Hex(), json_task.ID.Hex()) // In our queue, task 2 should have been lowered in prio, and task1 should be active. found_task := Task{} - err := s.db.C("flamenco_tasks").FindId(task2.Id).One(&found_task) + err := s.db.C("flamenco_tasks").FindId(task2.ID).One(&found_task) assert.Equal(t, nil, err) assert.Equal(t, "queued", found_task.Status) assert.Equal(t, 5, found_task.Priority) - err = s.db.C("flamenco_tasks").FindId(task1.Id).One(&found_task) + err = s.db.C("flamenco_tasks").FindId(task1.ID).One(&found_task) assert.Equal(t, nil, err) assert.Equal(t, "active", found_task.Status) assert.Equal(t, 50, found_task.Priority) @@ -310,7 +310,7 @@ func (s *SchedulerTestSuite) TestSchedulerVerifyUpstreamDeleted(t *check.C) { // Perform HTTP request to the scheduler. resp_rec := httptest.NewRecorder() request, _ := http.NewRequest("GET", "/task", nil) - ar := &auth.AuthenticatedRequest{Request: *request, Username: s.worker_lnx.Id.Hex()} + ar := &auth.AuthenticatedRequest{Request: *request, Username: s.worker_lnx.ID.Hex()} s.sched.ScheduleTask(resp_rec, ar) timedout := <-timeout @@ -321,16 +321,16 @@ func (s *SchedulerTestSuite) TestSchedulerVerifyUpstreamDeleted(t *check.C) { parseJson(t, resp_rec, 200, &json_task) // We should have gotten task 1, because task 2 was deleted. - assert.Equal(t, task1.Id.Hex(), json_task.Id.Hex()) + assert.Equal(t, task1.ID.Hex(), json_task.ID.Hex()) // In our queue, task 2 should have been canceled, and task1 should be active. found_task := Task{} - err := s.db.C("flamenco_tasks").FindId(task2.Id).One(&found_task) + err := s.db.C("flamenco_tasks").FindId(task2.ID).One(&found_task) assert.Equal(t, nil, err) assert.Equal(t, "canceled", found_task.Status) assert.Equal(t, 100, found_task.Priority) - err = s.db.C("flamenco_tasks").FindId(task1.Id).One(&found_task) + err = s.db.C("flamenco_tasks").FindId(task1.ID).One(&found_task) assert.Equal(t, nil, err) assert.Equal(t, "active", found_task.Status) assert.Equal(t, 50, found_task.Priority) @@ -342,17 +342,17 @@ func (s *SchedulerTestSuite) TestParentTaskNotCompleted(c *check.C) { // Task 1 is being worked on by worker_win task1 := ConstructTestTaskWithPrio("1aaaaaaaaaaaaaaaaaaaaaaa", "sleeping", 50) task1.Status = "active" - task1.WorkerId = &s.worker_win.Id + task1.WorkerID = &s.worker_win.ID assert.Nil(c, tasks_coll.Insert(task1)) // Task 2 is unavailable due to its parent not being completed. task2 := ConstructTestTaskWithPrio("2aaaaaaaaaaaaaaaaaaaaaaa", "sleeping", 100) - task2.Parents = []bson.ObjectId{task1.Id} + task2.Parents = []bson.ObjectId{task1.ID} task2.Status = "claimed-by-manager" assert.Nil(c, tasks_coll.Insert(task2)) // Fetch a task from the queue - resp_rec, _ := WorkerTestRequest(s.worker_lnx.Id, "TEST", "/whatevah") + resp_rec, _ := WorkerTestRequest(s.worker_lnx.ID, "TEST", "/whatevah") task := s.sched.fetchTaskFromQueueOrManager(resp_rec, s.db, &s.worker_lnx) // We should not get any task back, since task1 is already taken, and task2 @@ -367,25 +367,25 @@ func (s *SchedulerTestSuite) TestParentTaskCompleted(c *check.C) { // Task 1 has been completed by worker_win task1 := ConstructTestTaskWithPrio("1aaaaaaaaaaaaaaaaaaaaaaa", "sleeping", 50) task1.Status = "completed" - task1.WorkerId = &s.worker_win.Id + task1.WorkerID = &s.worker_win.ID assert.Nil(c, tasks_coll.Insert(task1)) // Task 2 is available due to its parent being completed. task2 := ConstructTestTaskWithPrio("2aaaaaaaaaaaaaaaaaaaaaaa", "sleeping", 100) - task2.Parents = []bson.ObjectId{task1.Id} + task2.Parents = []bson.ObjectId{task1.ID} task2.Status = "claimed-by-manager" assert.Nil(c, tasks_coll.Insert(task2)) // Fetch a task from the queue - resp_rec, _ := WorkerTestRequest(s.worker_lnx.Id, "TEST", "/whatevah") + resp_rec, _ := WorkerTestRequest(s.worker_lnx.ID, "TEST", "/whatevah") task := s.sched.fetchTaskFromQueueOrManager(resp_rec, s.db, &s.worker_lnx) assert.Equal(c, http.StatusOK, resp_rec.Code) // We should get task 2. - assert.NotNil(c, task, "Expected task %s, got nil instead", task2.Id.Hex()) + assert.NotNil(c, task, "Expected task %s, got nil instead", task2.ID.Hex()) if task != nil { // prevent nil pointer dereference - assert.Equal(c, task.Id, task2.Id, "Expected task %s, got task %s instead", - task2.Id.Hex(), task.Id.Hex()) + assert.Equal(c, task.ID, task2.ID, "Expected task %s, got task %s instead", + task2.ID.Hex(), task.ID.Hex()) } } @@ -395,23 +395,23 @@ func (s *SchedulerTestSuite) TestParentTaskOneCompletedOneNot(c *check.C) { // Task 1 is being worked on by worker_win task1 := ConstructTestTaskWithPrio("1aaaaaaaaaaaaaaaaaaaaaaa", "sleeping", 50) task1.Status = "active" - task1.WorkerId = &s.worker_win.Id + task1.WorkerID = &s.worker_win.ID assert.Nil(c, tasks_coll.Insert(task1)) // Task 2 is already completed. task2 := ConstructTestTaskWithPrio("2aaaaaaaaaaaaaaaaaaaaaaa", "sleeping", 50) task2.Status = "completed" - task2.WorkerId = &s.worker_win.Id + task2.WorkerID = &s.worker_win.ID assert.Nil(c, tasks_coll.Insert(task2)) // Task 3 is unavailable due to one of its parent not being completed. task3 := ConstructTestTaskWithPrio("3aaaaaaaaaaaaaaaaaaaaaaa", "sleeping", 100) - task3.Parents = []bson.ObjectId{task1.Id, task2.Id} + task3.Parents = []bson.ObjectId{task1.ID, task2.ID} task3.Status = "claimed-by-manager" assert.Nil(c, tasks_coll.Insert(task3)) // Fetch a task from the queue - resp_rec, _ := WorkerTestRequest(s.worker_lnx.Id, "TEST", "/whatevah") + resp_rec, _ := WorkerTestRequest(s.worker_lnx.ID, "TEST", "/whatevah") task := s.sched.fetchTaskFromQueueOrManager(resp_rec, s.db, &s.worker_lnx) // We should not get any task back. diff --git a/packages/flamenco-manager-go/src/flamenco-manager/flamenco/task_timeout_check.go b/packages/flamenco-manager-go/src/flamenco-manager/flamenco/task_timeout_check.go index 0e6006e9..5b9941c8 100644 --- a/packages/flamenco-manager-go/src/flamenco-manager/flamenco/task_timeout_check.go +++ b/packages/flamenco-manager-go/src/flamenco-manager/flamenco/task_timeout_check.go @@ -83,24 +83,24 @@ func (self *TaskTimeoutChecker) Check(db *mgo.Database) { } for _, task := range timedout_tasks { - log.Warningf(" - Task %s (%s) timed out", task.Name, task.Id.Hex()) + log.Warningf(" - Task %s (%s) timed out", task.Name, task.ID.Hex()) var ident string if task.Worker != "" { ident = task.Worker - } else if task.WorkerId != nil { - ident = task.WorkerId.Hex() + } else if task.WorkerID != nil { + ident = task.WorkerID.Hex() } else { ident = "-no worker-" } tupdate := TaskUpdate{ - TaskId: task.Id, + TaskID: task.ID, TaskStatus: "failed", Activity: fmt.Sprintf("Task timed out on worker %s", ident), Log: fmt.Sprintf( "%s Task %s (%s) timed out, was active but untouched since %s. "+ "Was handled by worker %s", - UtcNow().Format(IsoFormat), task.Name, task.Id.Hex(), task.LastWorkerPing, ident), + UtcNow().Format(IsoFormat), task.Name, task.ID.Hex(), task.LastWorkerPing, ident), } QueueTaskUpdate(&tupdate, db) } diff --git a/packages/flamenco-manager-go/src/flamenco-manager/flamenco/task_updates.go b/packages/flamenco-manager-go/src/flamenco-manager/flamenco/task_updates.go index e4618d2b..80b4ce15 100644 --- a/packages/flamenco-manager-go/src/flamenco-manager/flamenco/task_updates.go +++ b/packages/flamenco-manager-go/src/flamenco-manager/flamenco/task_updates.go @@ -50,7 +50,7 @@ func QueueTaskUpdateFromWorker(w http.ResponseWriter, r *auth.AuthenticatedReque if err := DecodeJson(w, r.Body, &tupdate, fmt.Sprintf("%s QueueTaskUpdate:", worker.Identifier())); err != nil { return } - tupdate.TaskId = task_id + tupdate.TaskID = task_id tupdate.Worker = worker.Identifier() // Check that this worker is allowed to update this task. @@ -62,20 +62,20 @@ func QueueTaskUpdateFromWorker(w http.ResponseWriter, r *auth.AuthenticatedReque fmt.Fprintf(w, "Task %s is unknown.", task_id.Hex()) return } - if task.WorkerId != nil && *task.WorkerId != worker.Id { + if task.WorkerID != nil && *task.WorkerID != worker.ID { log.Warningf("%s QueueTaskUpdateFromWorker: task %s update rejected from %s (%s), task is assigned to %s", - r.RemoteAddr, task_id.Hex(), worker.Id.Hex(), worker.Identifier(), task.WorkerId.Hex()) + r.RemoteAddr, task_id.Hex(), worker.ID.Hex(), worker.Identifier(), task.WorkerID.Hex()) w.WriteHeader(http.StatusConflict) fmt.Fprintf(w, "Task %s is assigned to another worker.", task_id.Hex()) return } - // Only set the task's worker ID if it's not already set to the current worker. - var set_worker_id *bson.ObjectId = nil - if task.WorkerId == nil { - set_worker_id = &worker.Id + // Only set the task's worker.ID if it's not already set to the current worker. + var setWorkerID *bson.ObjectId + if task.WorkerID == nil { + setWorkerID = &worker.ID } - WorkerPingedTask(set_worker_id, tupdate.TaskId, db) + WorkerPingedTask(setWorkerID, tupdate.TaskID, db) if err := QueueTaskUpdate(&tupdate, db); err != nil { log.Warningf("%s: %s", worker.Identifier(), err) @@ -97,7 +97,7 @@ func QueueTaskUpdate(tupdate *TaskUpdate, db *mgo.Database) error { func QueueTaskUpdateWithExtra(tupdate *TaskUpdate, db *mgo.Database, extra_updates bson.M) error { // For ensuring the ordering of updates. time.Time has nanosecond precision. tupdate.ReceivedOnManager = time.Now().UTC() - tupdate.Id = bson.NewObjectId() + tupdate.ID = bson.NewObjectId() // Store the update in the queue for sending to the Flamenco Server later. task_update_queue := db.C(QUEUE_MGO_COLLECTION) @@ -112,27 +112,27 @@ func QueueTaskUpdateWithExtra(tupdate *TaskUpdate, db *mgo.Database, extra_updat updates := extra_updates if tupdate.TaskStatus != "" { // Before blindly applying the task status, first check if the transition is valid. - if TaskStatusTransitionValid(task_coll, tupdate.TaskId, tupdate.TaskStatus) { + if TaskStatusTransitionValid(task_coll, tupdate.TaskID, tupdate.TaskStatus) { updates["status"] = tupdate.TaskStatus } else { log.Warningf("QueueTaskUpdate: not locally applying status=%s for %s", - tupdate.TaskStatus, tupdate.TaskId.Hex()) + tupdate.TaskStatus, tupdate.TaskID.Hex()) } } if tupdate.Activity != "" { updates["activity"] = tupdate.Activity } if len(updates) > 0 { - log.Debugf("QueueTaskUpdate: applying update %s to task %s", updates, tupdate.TaskId.Hex()) - if err := task_coll.UpdateId(tupdate.TaskId, bson.M{"$set": updates}); err != nil { + log.Debugf("QueueTaskUpdate: applying update %s to task %s", updates, tupdate.TaskID.Hex()) + if err := task_coll.UpdateId(tupdate.TaskID, bson.M{"$set": updates}); err != nil { if err != mgo.ErrNotFound { return fmt.Errorf("QueueTaskUpdate: error updating local task cache: %s", err) } else { - log.Warningf("QueueTaskUpdate: cannot find task %s to update locally", tupdate.TaskId.Hex()) + log.Warningf("QueueTaskUpdate: cannot find task %s to update locally", tupdate.TaskID.Hex()) } } } else { - log.Debugf("QueueTaskUpdate: nothing to do locally for task %s", tupdate.TaskId.Hex()) + log.Debugf("QueueTaskUpdate: nothing to do locally for task %s", tupdate.TaskID.Hex()) } return nil @@ -323,7 +323,7 @@ func (self *TaskUpdatePusher) handle_incoming_cancel_requests(cancel_task_ids [] queue_task_cancel := func(task_id bson.ObjectId) { tupdate := TaskUpdate{ - TaskId: task_id, + TaskID: task_id, TaskStatus: "canceled", } if err := QueueTaskUpdate(&tupdate, db); err != nil { @@ -335,13 +335,13 @@ func (self *TaskUpdatePusher) handle_incoming_cancel_requests(cancel_task_ids [] } for _, task_to_cancel := range tasks_to_cancel { - seen_tasks[task_to_cancel.Id] = true + seen_tasks[task_to_cancel.ID] = true if task_to_cancel.Status == "active" { // This needs to be canceled through the worker, and thus go to cancel-requested. - go_to_cancel_requested = append(go_to_cancel_requested, task_to_cancel.Id) + go_to_cancel_requested = append(go_to_cancel_requested, task_to_cancel.ID) } else { - queue_task_cancel(task_to_cancel.Id) + queue_task_cancel(task_to_cancel.ID) } } @@ -379,7 +379,7 @@ func (self *TaskUpdatePusher) handle_incoming_cancel_requests(cancel_task_ids [] func LogTaskActivity(worker *Worker, task_id bson.ObjectId, activity, log_line string, db *mgo.Database) { tupdate := TaskUpdate{ - TaskId: task_id, + TaskID: task_id, Activity: activity, Log: log_line, } diff --git a/packages/flamenco-manager-go/src/flamenco-manager/flamenco/task_updates_test.go b/packages/flamenco-manager-go/src/flamenco-manager/flamenco/task_updates_test.go index 48b3245d..6f17811a 100644 --- a/packages/flamenco-manager-go/src/flamenco-manager/flamenco/task_updates_test.go +++ b/packages/flamenco-manager-go/src/flamenco-manager/flamenco/task_updates_test.go @@ -64,7 +64,7 @@ func (s *TaskUpdatesTestSuite) TestCancelRunningTasks(t *check.C) { log.Info("POST from manager received on server, sending back TaskUpdateResponse.") resp := TaskUpdateResponse{ - CancelTasksIds: []bson.ObjectId{task2.Id}, + CancelTasksIds: []bson.ObjectId{task2.ID}, } return httpmock.NewJsonResponse(200, &resp) }, @@ -90,9 +90,9 @@ func (s *TaskUpdatesTestSuite) TestCancelRunningTasks(t *check.C) { // Check that one task was canceled and the other was not. task_db := Task{} - assert.Nil(t, tasks_coll.FindId(task1.Id).One(&task_db)) + assert.Nil(t, tasks_coll.FindId(task1.ID).One(&task_db)) assert.Equal(t, "queued", task_db.Status) - assert.Nil(t, tasks_coll.FindId(task2.Id).One(&task_db)) + assert.Nil(t, tasks_coll.FindId(task2.ID).One(&task_db)) assert.Equal(t, "canceled", task_db.Status) } @@ -114,33 +114,33 @@ func (s *TaskUpdatesTestSuite) TestMultipleWorkersForOneTask(c *check.C) { assert.Nil(c, StoreNewWorker(&worker2, s.db)) // Task should not be assigned to any worker - assert.Nil(c, task1.WorkerId) + assert.Nil(c, task1.WorkerID) tupdate := TaskUpdate{ - TaskId: task1.Id, + TaskID: task1.ID, Activity: "doing stuff by worker1", } payload_bytes, err := json.Marshal(tupdate) assert.Nil(c, err) - resp_rec, ar := WorkerTestRequestWithBody(worker1.Id, bytes.NewBuffer(payload_bytes), "POST", "/tasks/1aaaaaaaaaaaaaaaaaaaaaaa/update") - QueueTaskUpdateFromWorker(resp_rec, ar, s.db, task1.Id) + resp_rec, ar := WorkerTestRequestWithBody(worker1.ID, bytes.NewBuffer(payload_bytes), "POST", "/tasks/1aaaaaaaaaaaaaaaaaaaaaaa/update") + QueueTaskUpdateFromWorker(resp_rec, ar, s.db, task1.ID) assert.Equal(c, 204, resp_rec.Code) // Because of this update, the task should be assigned to worker 1 - assert.Nil(c, tasks_coll.FindId(task1.Id).One(&task1)) - assert.Equal(c, task1.WorkerId, task1.WorkerId) + assert.Nil(c, tasks_coll.FindId(task1.ID).One(&task1)) + assert.Equal(c, task1.WorkerID, task1.WorkerID) assert.Equal(c, task1.Activity, "doing stuff by worker1") // An update by worker 2 should fail. tupdate.Activity = "doing stuff by worker2" payload_bytes, err = json.Marshal(tupdate) assert.Nil(c, err) - resp_rec, ar = WorkerTestRequestWithBody(worker2.Id, bytes.NewBuffer(payload_bytes), "POST", "/tasks/1aaaaaaaaaaaaaaaaaaaaaaa/update") - QueueTaskUpdateFromWorker(resp_rec, ar, s.db, task1.Id) + resp_rec, ar = WorkerTestRequestWithBody(worker2.ID, bytes.NewBuffer(payload_bytes), "POST", "/tasks/1aaaaaaaaaaaaaaaaaaaaaaa/update") + QueueTaskUpdateFromWorker(resp_rec, ar, s.db, task1.ID) assert.Equal(c, http.StatusConflict, resp_rec.Code) // The task should still be assigned to worker 1 - assert.Nil(c, tasks_coll.FindId(task1.Id).One(&task1)) - assert.Equal(c, task1.WorkerId, task1.WorkerId) + assert.Nil(c, tasks_coll.FindId(task1.ID).One(&task1)) + assert.Equal(c, task1.WorkerID, task1.WorkerID) assert.Equal(c, task1.Activity, "doing stuff by worker1") } diff --git a/packages/flamenco-manager-go/src/flamenco-manager/flamenco/upstream.go b/packages/flamenco-manager-go/src/flamenco-manager/flamenco/upstream.go index 6b9c67d5..9167dc66 100644 --- a/packages/flamenco-manager-go/src/flamenco-manager/flamenco/upstream.go +++ b/packages/flamenco-manager-go/src/flamenco-manager/flamenco/upstream.go @@ -198,17 +198,17 @@ func download_tasks_from_upstream(config *Conf, mongo_sess *mgo.Session) { } tasks_coll := db.C("flamenco_tasks") for _, task := range depsgraph { - change, err := tasks_coll.Upsert(bson.M{"_id": task.Id}, task) + change, err := tasks_coll.Upsert(bson.M{"_id": task.ID}, task) if err != nil { - log.Errorf("unable to insert new task %s: %s", task.Id.Hex(), err) + log.Errorf("unable to insert new task %s: %s", task.ID.Hex(), err) continue } if change.Updated > 0 { - log.Debug("Upstream server re-queued existing task ", task.Id.Hex()) + log.Debug("Upstream server re-queued existing task ", task.ID.Hex()) } else if change.Matched > 0 { log.Debugf("Upstream server re-queued existing task %s, but nothing changed", - task.Id.Hex()) + task.ID.Hex()) } } @@ -249,7 +249,7 @@ func (self *UpstreamConnection) SendJson(logprefix, method string, url *url.URL, func (self *UpstreamConnection) SendStartupNotification() { notification := StartupNotification{ - ManagerUrl: self.config.OwnUrl, + ManagerURL: self.config.OwnUrl, VariablesByVarname: self.config.VariablesByVarname, NumberOfWorkers: 0, } @@ -341,7 +341,7 @@ func (self *UpstreamConnection) SendTaskUpdates(updates *[]TaskUpdate) (*TaskUpd * If it was changed or removed, this function return true. */ func (self *UpstreamConnection) RefetchTask(task *Task) bool { - get_url, err := self.ResolveUrl("/api/flamenco/tasks/%s", task.Id.Hex()) + get_url, err := self.ResolveUrl("/api/flamenco/tasks/%s", task.ID.Hex()) log.Infof("Verifying task with Flamenco Server %s", get_url) req, err := http.NewRequest("GET", get_url.String(), nil) @@ -361,20 +361,20 @@ func (self *UpstreamConnection) RefetchTask(task *Task) bool { if resp.StatusCode == http.StatusNotModified { // Nothing changed, we're good to go. - log.Infof("Cached task %s is still the same on the Server", task.Id.Hex()) + log.Infof("Cached task %s is still the same on the Server", task.ID.Hex()) return false } if resp.StatusCode >= 500 { // Internal errors, we'll ignore that. log.Warningf("Error %d trying to re-fetch task %s", - resp.StatusCode, task.Id.Hex()) + resp.StatusCode, task.ID.Hex()) return false } if 300 <= resp.StatusCode && resp.StatusCode < 400 { // Redirects, we'll ignore those too for now. log.Warningf("Redirect %d trying to re-fetch task %s, not following redirect.", - resp.StatusCode, task.Id.Hex()) + resp.StatusCode, task.ID.Hex()) return false } @@ -386,7 +386,7 @@ func (self *UpstreamConnection) RefetchTask(task *Task) bool { // Not found, access denied, that kind of stuff. Locally cancel the task. // TODO: probably better to go to "failed". log.Warningf("Code %d when re-fetching task %s; canceling local copy", - resp.StatusCode, task.Id.Hex()) + resp.StatusCode, task.ID.Hex()) new_task = *task new_task.Status = "canceled" @@ -407,9 +407,9 @@ func (self *UpstreamConnection) RefetchTask(task *Task) bool { // save the task to the queue. log.Infof("Cached task %s was changed on the Server, status=%s, priority=%d.", - task.Id.Hex(), new_task.Status, new_task.Priority) + task.ID.Hex(), new_task.Status, new_task.Priority) tasks_coll := self.session.DB("").C("flamenco_tasks") - tasks_coll.UpdateId(task.Id, + tasks_coll.UpdateId(task.ID, bson.M{"$set": new_task}) return true diff --git a/packages/flamenco-manager-go/src/flamenco-manager/flamenco/workers.go b/packages/flamenco-manager-go/src/flamenco-manager/flamenco/workers.go index 68c4be5e..2082c129 100644 --- a/packages/flamenco-manager-go/src/flamenco-manager/flamenco/workers.go +++ b/packages/flamenco-manager-go/src/flamenco-manager/flamenco/workers.go @@ -67,7 +67,7 @@ func StoreNewWorker(winfo *Worker, db *mgo.Database) error { var err error // Store it in MongoDB after hashing the password and assigning an ID. - winfo.Id = bson.NewObjectId() + winfo.ID = bson.NewObjectId() winfo.HashedSecret, err = bcrypt.GenerateFromPassword([]byte(winfo.Secret), bcrypt.DefaultCost) if err != nil { log.Errorf("Unable to hash password:", err) @@ -146,19 +146,19 @@ func WorkerMayRunTask(w http.ResponseWriter, r *auth.AuthenticatedRequest, task := Task{} if err := db.C("flamenco_tasks").FindId(task_id).One(&task); err != nil { log.Warningf("%s WorkerMayRunTask: unable to find task %s for worker %s", - r.RemoteAddr, task_id.Hex(), worker.Id.Hex()) + r.RemoteAddr, task_id.Hex(), worker.ID.Hex()) response.Reason = fmt.Sprintf("unable to find task %s", task_id.Hex()) - } else if task.WorkerId != nil && *task.WorkerId != worker.Id { + } else if task.WorkerID != nil && *task.WorkerID != worker.ID { log.Warningf("%s WorkerMayRunTask: task %s was assigned from worker %s to %s", - r.RemoteAddr, task_id.Hex(), worker.Id.Hex(), task.WorkerId.Hex()) + r.RemoteAddr, task_id.Hex(), worker.ID.Hex(), task.WorkerID.Hex()) response.Reason = fmt.Sprintf("task %s reassigned to another worker", task_id.Hex()) } else if !IsRunnableTaskStatus(task.Status) { log.Warningf("%s WorkerMayRunTask: task %s is in not-runnable status %s, worker %s will stop", - r.RemoteAddr, task_id.Hex(), task.Status, worker.Id.Hex()) + r.RemoteAddr, task_id.Hex(), task.Status, worker.ID.Hex()) response.Reason = fmt.Sprintf("task %s in non-runnable status %s", task_id.Hex(), task.Status) } else { response.MayKeepRunning = true - WorkerPingedTask(&worker.Id, task_id, db) + WorkerPingedTask(&worker.ID, task_id, db) } // Send the response @@ -214,8 +214,8 @@ func WorkerSeen(worker *Worker, remote_addr string, db *mgo.Database) { updates["address"] = remote_addr } - if err := db.C("flamenco_workers").UpdateId(worker.Id, bson.M{"$set": updates}); err != nil { - log.Errorf("WorkerSeen: unable to update worker %s in MongoDB: %s", worker.Id, err) + if err := db.C("flamenco_workers").UpdateId(worker.ID, bson.M{"$set": updates}); err != nil { + log.Errorf("WorkerSeen: unable to update worker %s in MongoDB: %s", worker.ID, err) } } @@ -237,7 +237,7 @@ func WorkerSignOff(w http.ResponseWriter, r *auth.AuthenticatedRequest, db *mgo. // Update the tasks assigned to the worker. var tasks []Task query := bson.M{ - "worker_id": worker.Id, + "worker_id": worker.ID, "status": "active", } sent_header := false @@ -256,15 +256,15 @@ func WorkerSignOff(w http.ResponseWriter, r *auth.AuthenticatedRequest, db *mgo. } for _, task := range tasks { - tupdate.TaskId = task.Id + tupdate.TaskID = task.ID if err := QueueTaskUpdate(&tupdate, db); err != nil { if !sent_header { w.WriteHeader(http.StatusInternalServerError) sent_header = true } - fmt.Fprintf(w, "Error updating task %s: %s\n", task.Id.Hex(), err) + fmt.Fprintf(w, "Error updating task %s: %s\n", task.ID.Hex(), err) log.Errorf("WorkerSignOff: unable to update task %s for worker %s in MongoDB: %s", - task.Id.Hex(), w_ident, err) + task.ID.Hex(), w_ident, err) } } } @@ -274,7 +274,7 @@ func WorkerSignOff(w http.ResponseWriter, r *auth.AuthenticatedRequest, db *mgo. updates := bson.M{ "status": worker.Status, } - if err := db.C("flamenco_workers").UpdateId(worker.Id, bson.M{"$set": updates}); err != nil { + if err := db.C("flamenco_workers").UpdateId(worker.ID, bson.M{"$set": updates}); err != nil { if !sent_header { w.WriteHeader(http.StatusInternalServerError) } diff --git a/packages/flamenco-manager-go/src/flamenco-manager/flamenco/workers_test.go b/packages/flamenco-manager-go/src/flamenco-manager/flamenco/workers_test.go index aceae187..d49c84e7 100644 --- a/packages/flamenco-manager-go/src/flamenco-manager/flamenco/workers_test.go +++ b/packages/flamenco-manager-go/src/flamenco-manager/flamenco/workers_test.go @@ -46,12 +46,12 @@ func (s *SchedulerTestSuite) TestWorkerMayRun(t *check.C) { } // Make sure the scheduler gives us this task. - resp_rec, ar := WorkerTestRequest(s.worker_lnx.Id, "GET", "/task") + resp_rec, ar := WorkerTestRequest(s.worker_lnx.ID, "GET", "/task") s.sched.ScheduleTask(resp_rec, ar) // Right after obtaining the task, we should be allowed to keep running it. - resp_rec, ar = WorkerTestRequest(s.worker_lnx.Id, "GET", "/may-i-run/%s", task.Id.Hex()) - WorkerMayRunTask(resp_rec, ar, s.db, task.Id) + resp_rec, ar = WorkerTestRequest(s.worker_lnx.ID, "GET", "/may-i-run/%s", task.ID.Hex()) + WorkerMayRunTask(resp_rec, ar, s.db, task.ID) resp := MayKeepRunningResponse{} parseJson(t, resp_rec, 200, &resp) @@ -59,22 +59,22 @@ func (s *SchedulerTestSuite) TestWorkerMayRun(t *check.C) { assert.Equal(t, true, resp.MayKeepRunning) // If we now change the task status to "cancel-requested", the worker should be denied. - assert.Nil(t, s.db.C("flamenco_tasks").UpdateId(task.Id, + assert.Nil(t, s.db.C("flamenco_tasks").UpdateId(task.ID, bson.M{"$set": bson.M{"status": "cancel-requested"}})) - resp_rec, ar = WorkerTestRequest(s.worker_lnx.Id, "GET", "/may-i-run/%s", task.Id.Hex()) - WorkerMayRunTask(resp_rec, ar, s.db, task.Id) + resp_rec, ar = WorkerTestRequest(s.worker_lnx.ID, "GET", "/may-i-run/%s", task.ID.Hex()) + WorkerMayRunTask(resp_rec, ar, s.db, task.ID) resp = MayKeepRunningResponse{} parseJson(t, resp_rec, 200, &resp) assert.Equal(t, false, resp.MayKeepRunning) // Changing status back to "active", but assigning to another worker - assert.Nil(t, s.db.C("flamenco_tasks").UpdateId(task.Id, bson.M{"$set": bson.M{ + assert.Nil(t, s.db.C("flamenco_tasks").UpdateId(task.ID, bson.M{"$set": bson.M{ "status": "active", - "worker_id": s.worker_win.Id, + "worker_id": s.worker_win.ID, }})) - resp_rec, ar = WorkerTestRequest(s.worker_lnx.Id, "GET", "/may-i-run/%s", task.Id.Hex()) - WorkerMayRunTask(resp_rec, ar, s.db, task.Id) + resp_rec, ar = WorkerTestRequest(s.worker_lnx.ID, "GET", "/may-i-run/%s", task.ID.Hex()) + WorkerMayRunTask(resp_rec, ar, s.db, task.ID) resp = MayKeepRunningResponse{} parseJson(t, resp_rec, 200, &resp) -- GitLab