Skip to content
Snippets Groups Projects
Commit d3859951 authored by Sybren A. Stüvel's avatar Sybren A. Stüvel
Browse files

Manager: improved logging

As a result, fetchTaskFromQueueOrManager doesn't need the http.Request()
object pointer any more.
parent 79ca835c
Branches
Tags
No related merge requests found
......@@ -41,7 +41,7 @@ func (ts *TaskScheduler) ScheduleTask(w http.ResponseWriter, r *auth.Authenticat
projection := bson.M{"platform": 1, "supported_job_types": 1, "address": 1, "nickname": 1}
worker, err := FindWorker(r.Username, projection, db)
if err != nil {
log.Warningf("%s ScheduleTask: Unable to find worker: %s", r.RemoteAddr, err)
log.Warningf("ScheduleTask: Unable to find worker, requested from %s: %s", r.RemoteAddr, err)
w.WriteHeader(http.StatusForbidden)
fmt.Fprintf(w, "Unable to find worker: %s", err)
return
......@@ -53,7 +53,7 @@ func (ts *TaskScheduler) ScheduleTask(w http.ResponseWriter, r *auth.Authenticat
var was_changed bool
for attempt := 0; attempt < 1000; attempt++ {
// Fetch the first available task of a supported job type.
task = ts.fetchTaskFromQueueOrManager(w, r, db, worker)
task = ts.fetchTaskFromQueueOrManager(w, db, worker)
if task == nil {
// A response has already been written to 'w'.
return
......@@ -75,7 +75,11 @@ func (ts *TaskScheduler) ScheduleTask(w http.ResponseWriter, r *auth.Authenticat
// Update the task status to "active", pushing it as a task update to the manager too.
task.Status = "active"
tupdate := TaskUpdate{TaskId: task.Id, TaskStatus: task.Status}
if err := QueueTaskUpdateWithExtra(&tupdate, db, bson.M{"worker_id": worker.Id}); err != nil {
local_updates := bson.M{
"worker_id": worker.Id,
"last_worker_ping": UtcNow(),
}
if err := QueueTaskUpdateWithExtra(&tupdate, db, local_updates); err != nil {
log.Errorf("Unable to queue task update while assigning task %s to worker %s: %s",
task.Id.Hex(), worker.Identifier(), err)
w.WriteHeader(http.StatusInternalServerError)
......@@ -103,12 +107,10 @@ func (ts *TaskScheduler) ScheduleTask(w http.ResponseWriter, r *auth.Authenticat
* Fetches a task from either the queue, or if it is empty, from the manager.
*/
func (ts *TaskScheduler) fetchTaskFromQueueOrManager(
w http.ResponseWriter, r *auth.AuthenticatedRequest,
db *mgo.Database, worker *Worker) *Task {
w http.ResponseWriter, db *mgo.Database, worker *Worker) *Task {
if len(worker.SupportedJobTypes) == 0 {
log.Warningf("%s: worker %s has no supported job types.",
r.RemoteAddr, worker.Id.Hex())
log.Warningf("TaskScheduler: worker %s has no supported job types.", worker.Identifier())
w.WriteHeader(http.StatusNotAcceptable)
fmt.Fprintln(w, "You do not support any job types.")
return nil
......@@ -175,13 +177,13 @@ func (ts *TaskScheduler) fetchTaskFromQueueOrManager(
err := pipe.One(&result)
if err == mgo.ErrNotFound {
log.Infof("%s no more tasks available for %s", r.RemoteAddr, worker.Identifier())
log.Infof("TaskScheduler: no more tasks available for %s", worker.Identifier())
ts.maybeKickTaskDownloader()
w.WriteHeader(204)
return nil
}
if err != nil {
log.Errorf("%s Error fetching task for %s: %s", r.RemoteAddr, r.Username, err)
log.Errorf("TaskScheduler: Error fetching task for %s: %s", worker.Identifier(), err)
w.WriteHeader(500)
return nil
}
......
......@@ -327,8 +327,8 @@ func (s *SchedulerTestSuite) TestParentTaskNotCompleted(c *check.C) {
assert.Nil(c, tasks_coll.Insert(task2))
// Fetch a task from the queue
resp_rec, ar := WorkerTestRequest(s.worker_lnx.Id, "TEST", "/whatevah")
task := s.sched.fetchTaskFromQueueOrManager(resp_rec, ar, s.db, &s.worker_lnx)
resp_rec, _ := WorkerTestRequest(s.worker_lnx.Id, "TEST", "/whatevah")
task := s.sched.fetchTaskFromQueueOrManager(resp_rec, s.db, &s.worker_lnx)
// We should not get any task back, since task1 is already taken, and task2
// has a non-completed parent.
......@@ -352,8 +352,8 @@ func (s *SchedulerTestSuite) TestParentTaskCompleted(c *check.C) {
assert.Nil(c, tasks_coll.Insert(task2))
// Fetch a task from the queue
resp_rec, ar := WorkerTestRequest(s.worker_lnx.Id, "TEST", "/whatevah")
task := s.sched.fetchTaskFromQueueOrManager(resp_rec, ar, s.db, &s.worker_lnx)
resp_rec, _ := WorkerTestRequest(s.worker_lnx.Id, "TEST", "/whatevah")
task := s.sched.fetchTaskFromQueueOrManager(resp_rec, s.db, &s.worker_lnx)
assert.Equal(c, http.StatusOK, resp_rec.Code)
// We should get task 2.
......@@ -386,8 +386,8 @@ func (s *SchedulerTestSuite) TestParentTaskOneCompletedOneNot(c *check.C) {
assert.Nil(c, tasks_coll.Insert(task3))
// Fetch a task from the queue
resp_rec, ar := WorkerTestRequest(s.worker_lnx.Id, "TEST", "/whatevah")
task := s.sched.fetchTaskFromQueueOrManager(resp_rec, ar, s.db, &s.worker_lnx)
resp_rec, _ := WorkerTestRequest(s.worker_lnx.Id, "TEST", "/whatevah")
task := s.sched.fetchTaskFromQueueOrManager(resp_rec, s.db, &s.worker_lnx)
// We should not get any task back.
assert.Nil(c, task, "Expected nil, got task %v instead", task)
......
......@@ -83,6 +83,7 @@ func (self *TaskTimeoutChecker) check(db *mgo.Database) {
"last_worker_ping": 1,
"worker_id": 1,
"worker": 1,
"name": 1,
}
if err := db.C("flamenco_tasks").Find(query).Select(projection).All(&timedout_tasks); err != nil {
log.Warningf("Error finding timed-out tasks: %s", err)
......@@ -106,7 +107,7 @@ func (self *TaskTimeoutChecker) check(db *mgo.Database) {
Log: fmt.Sprintf(
"%s Task %s (%s) timed out, was active but untouched since %s. "+
"Was handled by worker %s",
UtcNow(), task.Name, task.Id.Hex(), task.LastWorkerPing, ident),
UtcNow().Format(IsoFormat), task.Name, task.Id.Hex(), task.LastWorkerPing, ident),
}
QueueTaskUpdate(&tupdate, db)
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment