Skip to content
Snippets Groups Projects
Commit 08fb679b authored by Sybren A. Stüvel's avatar Sybren A. Stüvel
Browse files

Manager: fixed issue with TaskTimeoutChecker not killing un-pinged tasks

parent 617a08fa
Branches
Tags
No related merge requests found
...@@ -10,7 +10,6 @@ import ( ...@@ -10,7 +10,6 @@ import (
log "github.com/Sirupsen/logrus" log "github.com/Sirupsen/logrus"
mgo "gopkg.in/mgo.v2" mgo "gopkg.in/mgo.v2"
"gopkg.in/mgo.v2/bson"
) )
// Interval for checking all active tasks for timeouts. // Interval for checking all active tasks for timeouts.
...@@ -46,6 +45,7 @@ func (self *TaskTimeoutChecker) Go() { ...@@ -46,6 +45,7 @@ func (self *TaskTimeoutChecker) Go() {
ok := KillableSleep("TaskTimeoutChecker-initial", TASK_TIMEOUT_CHECK_INITIAL_SLEEP, ok := KillableSleep("TaskTimeoutChecker-initial", TASK_TIMEOUT_CHECK_INITIAL_SLEEP,
self.done_chan, self.done_wg) self.done_chan, self.done_wg)
if !ok { if !ok {
log.Warningf("TaskTimeoutChecker: Killable sleep was killed, not even starting checker.")
return return
} }
...@@ -67,14 +67,18 @@ func (self *TaskTimeoutChecker) Close() { ...@@ -67,14 +67,18 @@ func (self *TaskTimeoutChecker) Close() {
func (self *TaskTimeoutChecker) check(db *mgo.Database) { func (self *TaskTimeoutChecker) check(db *mgo.Database) {
timeout_threshold := UtcNow().Add(-self.config.ActiveTaskTimeoutInterval) timeout_threshold := UtcNow().Add(-self.config.ActiveTaskTimeoutInterval)
// log.Infof("Failing all active tasks that have not been touched since %s", timeout_threshold) log.Debugf("Failing all active tasks that have not been touched since %s", timeout_threshold)
var timedout_tasks []Task var timedout_tasks []Task
query := bson.M{ // find all active tasks that either have never been pinged, or were pinged long ago.
query := M{
"status": "active", "status": "active",
"last_worker_ping": bson.M{"$lte": timeout_threshold}, "$or": []M{
M{"last_worker_ping": M{"$lte": timeout_threshold}},
M{"last_worker_ping": M{"$exists": false}},
},
} }
projection := bson.M{ projection := M{
"_id": 1, "_id": 1,
"last_worker_ping": 1, "last_worker_ping": 1,
"worker_id": 1, "worker_id": 1,
...@@ -86,16 +90,23 @@ func (self *TaskTimeoutChecker) check(db *mgo.Database) { ...@@ -86,16 +90,23 @@ func (self *TaskTimeoutChecker) check(db *mgo.Database) {
for _, task := range timedout_tasks { for _, task := range timedout_tasks {
log.Warningf(" - Task %s (%s) timed out", task.Name, task.Id.Hex()) log.Warningf(" - Task %s (%s) timed out", task.Name, task.Id.Hex())
var ident string
if task.Worker != "" {
ident = task.Worker
} else if task.WorkerId != nil {
ident = task.WorkerId.Hex()
} else {
ident = "-no worker-"
}
tupdate := TaskUpdate{ tupdate := TaskUpdate{
TaskId: task.Id, TaskId: task.Id,
TaskStatus: "failed", TaskStatus: "failed",
Activity: fmt.Sprintf("Task timed out on worker %s (%s)", Activity: fmt.Sprintf("Task timed out on worker %s", ident),
task.Worker, task.WorkerId.Hex()),
Log: fmt.Sprintf( Log: fmt.Sprintf(
"%s Task %s (%s) timed out, was active but untouched since %s. "+ "%s Task %s (%s) timed out, was active but untouched since %s. "+
"Was handled by worker %s (%s)", "Was handled by worker %s",
UtcNow(), task.Name, task.Id.Hex(), task.LastWorkerPing, UtcNow(), task.Name, task.Id.Hex(), task.LastWorkerPing, ident),
task.Worker, task.WorkerId.Hex()),
} }
QueueTaskUpdate(&tupdate, db) QueueTaskUpdate(&tupdate, db)
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment