diff --git a/packages/flamenco-manager-go/src/flamenco-manager/flamenco/task_timeout_check.go b/packages/flamenco-manager-go/src/flamenco-manager/flamenco/task_timeout_check.go index ab96c9dd9847a484c28d6fb98f5b76508463c323..753729deff87487514ed1979b5e6e5cb413853cd 100644 --- a/packages/flamenco-manager-go/src/flamenco-manager/flamenco/task_timeout_check.go +++ b/packages/flamenco-manager-go/src/flamenco-manager/flamenco/task_timeout_check.go @@ -10,7 +10,6 @@ import ( log "github.com/Sirupsen/logrus" mgo "gopkg.in/mgo.v2" - "gopkg.in/mgo.v2/bson" ) // Interval for checking all active tasks for timeouts. @@ -46,6 +45,7 @@ func (self *TaskTimeoutChecker) Go() { ok := KillableSleep("TaskTimeoutChecker-initial", TASK_TIMEOUT_CHECK_INITIAL_SLEEP, self.done_chan, self.done_wg) if !ok { + log.Warningf("TaskTimeoutChecker: Killable sleep was killed, not even starting checker.") return } @@ -67,14 +67,18 @@ func (self *TaskTimeoutChecker) Close() { func (self *TaskTimeoutChecker) check(db *mgo.Database) { timeout_threshold := UtcNow().Add(-self.config.ActiveTaskTimeoutInterval) - // log.Infof("Failing all active tasks that have not been touched since %s", timeout_threshold) + log.Debugf("Failing all active tasks that have not been touched since %s", timeout_threshold) var timedout_tasks []Task - query := bson.M{ - "status": "active", - "last_worker_ping": bson.M{"$lte": timeout_threshold}, + // find all active tasks that either have never been pinged, or were pinged long ago. + query := M{ + "status": "active", + "$or": []M{ + M{"last_worker_ping": M{"$lte": timeout_threshold}}, + M{"last_worker_ping": M{"$exists": false}}, + }, } - projection := bson.M{ + projection := M{ "_id": 1, "last_worker_ping": 1, "worker_id": 1, @@ -86,16 +90,23 @@ func (self *TaskTimeoutChecker) check(db *mgo.Database) { for _, task := range timedout_tasks { log.Warningf(" - Task %s (%s) timed out", task.Name, task.Id.Hex()) + var ident string + if task.Worker != "" { + ident = task.Worker + } else if task.WorkerId != nil { + ident = task.WorkerId.Hex() + } else { + ident = "-no worker-" + } + tupdate := TaskUpdate{ TaskId: task.Id, TaskStatus: "failed", - Activity: fmt.Sprintf("Task timed out on worker %s (%s)", - task.Worker, task.WorkerId.Hex()), + Activity: fmt.Sprintf("Task timed out on worker %s", ident), Log: fmt.Sprintf( "%s Task %s (%s) timed out, was active but untouched since %s. "+ - "Was handled by worker %s (%s)", - UtcNow(), task.Name, task.Id.Hex(), task.LastWorkerPing, - task.Worker, task.WorkerId.Hex()), + "Was handled by worker %s", + UtcNow(), task.Name, task.Id.Hex(), task.LastWorkerPing, ident), } QueueTaskUpdate(&tupdate, db) }