diff --git a/packages/flamenco-manager-go/src/flamenco-manager/flamenco/scheduler.go b/packages/flamenco-manager-go/src/flamenco-manager/flamenco/scheduler.go index 92f4c77935c604bf70529a1104811d8dcb10ac0a..b75e9bdbecc6ab196f421a7ef69706ab0ca050b1 100644 --- a/packages/flamenco-manager-go/src/flamenco-manager/flamenco/scheduler.go +++ b/packages/flamenco-manager-go/src/flamenco-manager/flamenco/scheduler.go @@ -41,7 +41,7 @@ func (ts *TaskScheduler) ScheduleTask(w http.ResponseWriter, r *auth.Authenticat projection := bson.M{"platform": 1, "supported_job_types": 1, "address": 1, "nickname": 1} worker, err := FindWorker(r.Username, projection, db) if err != nil { - log.Warningf("%s ScheduleTask: Unable to find worker: %s", r.RemoteAddr, err) + log.Warningf("ScheduleTask: Unable to find worker, requested from %s: %s", r.RemoteAddr, err) w.WriteHeader(http.StatusForbidden) fmt.Fprintf(w, "Unable to find worker: %s", err) return @@ -53,7 +53,7 @@ func (ts *TaskScheduler) ScheduleTask(w http.ResponseWriter, r *auth.Authenticat var was_changed bool for attempt := 0; attempt < 1000; attempt++ { // Fetch the first available task of a supported job type. - task = ts.fetchTaskFromQueueOrManager(w, r, db, worker) + task = ts.fetchTaskFromQueueOrManager(w, db, worker) if task == nil { // A response has already been written to 'w'. return @@ -75,7 +75,11 @@ func (ts *TaskScheduler) ScheduleTask(w http.ResponseWriter, r *auth.Authenticat // Update the task status to "active", pushing it as a task update to the manager too. task.Status = "active" tupdate := TaskUpdate{TaskId: task.Id, TaskStatus: task.Status} - if err := QueueTaskUpdateWithExtra(&tupdate, db, bson.M{"worker_id": worker.Id}); err != nil { + local_updates := bson.M{ + "worker_id": worker.Id, + "last_worker_ping": UtcNow(), + } + if err := QueueTaskUpdateWithExtra(&tupdate, db, local_updates); err != nil { log.Errorf("Unable to queue task update while assigning task %s to worker %s: %s", task.Id.Hex(), worker.Identifier(), err) w.WriteHeader(http.StatusInternalServerError) @@ -103,12 +107,10 @@ func (ts *TaskScheduler) ScheduleTask(w http.ResponseWriter, r *auth.Authenticat * Fetches a task from either the queue, or if it is empty, from the manager. */ func (ts *TaskScheduler) fetchTaskFromQueueOrManager( - w http.ResponseWriter, r *auth.AuthenticatedRequest, - db *mgo.Database, worker *Worker) *Task { + w http.ResponseWriter, db *mgo.Database, worker *Worker) *Task { if len(worker.SupportedJobTypes) == 0 { - log.Warningf("%s: worker %s has no supported job types.", - r.RemoteAddr, worker.Id.Hex()) + log.Warningf("TaskScheduler: worker %s has no supported job types.", worker.Identifier()) w.WriteHeader(http.StatusNotAcceptable) fmt.Fprintln(w, "You do not support any job types.") return nil @@ -175,13 +177,13 @@ func (ts *TaskScheduler) fetchTaskFromQueueOrManager( err := pipe.One(&result) if err == mgo.ErrNotFound { - log.Infof("%s no more tasks available for %s", r.RemoteAddr, worker.Identifier()) + log.Infof("TaskScheduler: no more tasks available for %s", worker.Identifier()) ts.maybeKickTaskDownloader() w.WriteHeader(204) return nil } if err != nil { - log.Errorf("%s Error fetching task for %s: %s", r.RemoteAddr, r.Username, err) + log.Errorf("TaskScheduler: Error fetching task for %s: %s", worker.Identifier(), err) w.WriteHeader(500) return nil } diff --git a/packages/flamenco-manager-go/src/flamenco-manager/flamenco/scheduler_test.go b/packages/flamenco-manager-go/src/flamenco-manager/flamenco/scheduler_test.go index 0552c0e8514b91299db548b6f597d462f8c86c7e..ba5773f5cdc0724a44a6d8c83750e76772fdc9b7 100644 --- a/packages/flamenco-manager-go/src/flamenco-manager/flamenco/scheduler_test.go +++ b/packages/flamenco-manager-go/src/flamenco-manager/flamenco/scheduler_test.go @@ -327,8 +327,8 @@ func (s *SchedulerTestSuite) TestParentTaskNotCompleted(c *check.C) { assert.Nil(c, tasks_coll.Insert(task2)) // Fetch a task from the queue - resp_rec, ar := WorkerTestRequest(s.worker_lnx.Id, "TEST", "/whatevah") - task := s.sched.fetchTaskFromQueueOrManager(resp_rec, ar, s.db, &s.worker_lnx) + resp_rec, _ := WorkerTestRequest(s.worker_lnx.Id, "TEST", "/whatevah") + task := s.sched.fetchTaskFromQueueOrManager(resp_rec, s.db, &s.worker_lnx) // We should not get any task back, since task1 is already taken, and task2 // has a non-completed parent. @@ -352,8 +352,8 @@ func (s *SchedulerTestSuite) TestParentTaskCompleted(c *check.C) { assert.Nil(c, tasks_coll.Insert(task2)) // Fetch a task from the queue - resp_rec, ar := WorkerTestRequest(s.worker_lnx.Id, "TEST", "/whatevah") - task := s.sched.fetchTaskFromQueueOrManager(resp_rec, ar, s.db, &s.worker_lnx) + resp_rec, _ := WorkerTestRequest(s.worker_lnx.Id, "TEST", "/whatevah") + task := s.sched.fetchTaskFromQueueOrManager(resp_rec, s.db, &s.worker_lnx) assert.Equal(c, http.StatusOK, resp_rec.Code) // We should get task 2. @@ -386,8 +386,8 @@ func (s *SchedulerTestSuite) TestParentTaskOneCompletedOneNot(c *check.C) { assert.Nil(c, tasks_coll.Insert(task3)) // Fetch a task from the queue - resp_rec, ar := WorkerTestRequest(s.worker_lnx.Id, "TEST", "/whatevah") - task := s.sched.fetchTaskFromQueueOrManager(resp_rec, ar, s.db, &s.worker_lnx) + resp_rec, _ := WorkerTestRequest(s.worker_lnx.Id, "TEST", "/whatevah") + task := s.sched.fetchTaskFromQueueOrManager(resp_rec, s.db, &s.worker_lnx) // We should not get any task back. assert.Nil(c, task, "Expected nil, got task %v instead", task) diff --git a/packages/flamenco-manager-go/src/flamenco-manager/flamenco/task_timeout_check.go b/packages/flamenco-manager-go/src/flamenco-manager/flamenco/task_timeout_check.go index 753729deff87487514ed1979b5e6e5cb413853cd..3c21cd4d9e48f0bc0bf107a880af63e174de2190 100644 --- a/packages/flamenco-manager-go/src/flamenco-manager/flamenco/task_timeout_check.go +++ b/packages/flamenco-manager-go/src/flamenco-manager/flamenco/task_timeout_check.go @@ -83,6 +83,7 @@ func (self *TaskTimeoutChecker) check(db *mgo.Database) { "last_worker_ping": 1, "worker_id": 1, "worker": 1, + "name": 1, } if err := db.C("flamenco_tasks").Find(query).Select(projection).All(&timedout_tasks); err != nil { log.Warningf("Error finding timed-out tasks: %s", err) @@ -106,7 +107,7 @@ func (self *TaskTimeoutChecker) check(db *mgo.Database) { Log: fmt.Sprintf( "%s Task %s (%s) timed out, was active but untouched since %s. "+ "Was handled by worker %s", - UtcNow(), task.Name, task.Id.Hex(), task.LastWorkerPing, ident), + UtcNow().Format(IsoFormat), task.Name, task.Id.Hex(), task.LastWorkerPing, ident), } QueueTaskUpdate(&tupdate, db) }