Skip to content
Snippets Groups Projects
Commit 0c176d45 authored by Sybren A. Stüvel's avatar Sybren A. Stüvel
Browse files

Manager: allow workers to sign off, which re-schedules their tasks

Any active task assigned to the worker will be reset to status
'claimed-by-manager', so that it can be re-executed by another worker.
parent de230d2a
No related branches found
No related tags found
No related merge requests found
...@@ -6,6 +6,7 @@ import ( ...@@ -6,6 +6,7 @@ import (
"fmt" "fmt"
"log" "log"
"net/http" "net/http"
"time"
auth "github.com/abbot/go-http-auth" auth "github.com/abbot/go-http-auth"
...@@ -204,3 +205,66 @@ func WorkerSeen(worker *Worker, remote_addr string, db *mgo.Database) { ...@@ -204,3 +205,66 @@ func WorkerSeen(worker *Worker, remote_addr string, db *mgo.Database) {
log.Printf("WorkerSeen: ERROR: unable to update worker %s in MongoDB: %s", worker.Id, err) log.Printf("WorkerSeen: ERROR: unable to update worker %s in MongoDB: %s", worker.Id, err)
} }
} }
/**
* Re-queues all active tasks (should be only one) that are assigned to this worker.
*/
func WorkerSignOff(w http.ResponseWriter, r *auth.AuthenticatedRequest, db *mgo.Database) {
// Get the worker
worker, err := FindWorker(r.Username, bson.M{"_id": 1, "address": 1, "nickname": 1}, db)
if err != nil {
log.Printf("%s WorkerSignOff: Unable to find worker: %s\n", r.RemoteAddr, err)
w.WriteHeader(http.StatusForbidden)
return
}
w_ident := worker.Identifier()
log.Printf("%s Worker %s signing off\n", r.RemoteAddr, w_ident)
// Update the tasks assigned to the worker.
var tasks []Task
query := bson.M{
"worker_id": worker.Id,
"status": "active",
}
sent_header := false
if err := db.C("flamenco_tasks").Find(query).All(&tasks); err != nil {
log.Printf("WorkerSignOff: ERROR: unable to find active tasks of worker %s in MongoDB: %s",
w_ident, err)
w.WriteHeader(http.StatusInternalServerError)
sent_header = true
} else {
tupdate := TaskUpdate{
TaskStatus: "claimed-by-manager",
Worker: "-", // no longer assigned to any worker
Activity: fmt.Sprintf("Re-queued task after worker %s signed off", w_ident),
Log: fmt.Sprintf("%s: Manager re-queued task after worker %s signed off",
time.Now(), w_ident),
}
for _, task := range tasks {
tupdate.TaskId = task.Id
if err := QueueTaskUpdate(&tupdate, db); err != nil {
if !sent_header {
w.WriteHeader(http.StatusInternalServerError)
sent_header = true
}
fmt.Fprintf(w, "Error updating task %s: %s\n", task.Id.Hex(), err)
log.Printf("WorkerSignOff: ERROR: unable to update task %s for worker %s in MongoDB: %s",
task.Id.Hex(), w_ident, err)
}
}
}
// Update the worker itself, to show it's down in the DB too.
worker.Status = "down"
updates := bson.M{
"status": worker.Status,
}
if err := db.C("flamenco_workers").UpdateId(worker.Id, bson.M{"$set": updates}); err != nil {
if !sent_header {
w.WriteHeader(http.StatusInternalServerError)
}
log.Printf("WorkerSignOff: ERROR: unable to update worker %s in MongoDB: %s", w_ident, err)
}
}
...@@ -84,6 +84,13 @@ func http_worker_may_run_task(w http.ResponseWriter, r *auth.AuthenticatedReques ...@@ -84,6 +84,13 @@ func http_worker_may_run_task(w http.ResponseWriter, r *auth.AuthenticatedReques
flamenco.WorkerMayRunTask(w, r, mongo_sess.DB(""), bson.ObjectIdHex(task_id)) flamenco.WorkerMayRunTask(w, r, mongo_sess.DB(""), bson.ObjectIdHex(task_id))
} }
func http_worker_sign_off(w http.ResponseWriter, r *auth.AuthenticatedRequest) {
mongo_sess := session.Copy()
defer mongo_sess.Close()
flamenco.WorkerSignOff(w, r, mongo_sess.DB(""))
}
func worker_secret(user, realm string) string { func worker_secret(user, realm string) string {
mongo_sess := session.Copy() mongo_sess := session.Copy()
defer mongo_sess.Close() defer mongo_sess.Close()
...@@ -142,6 +149,7 @@ func main() { ...@@ -142,6 +149,7 @@ func main() {
router.HandleFunc("/task", worker_authenticator.Wrap(http_schedule_task)).Methods("POST") router.HandleFunc("/task", worker_authenticator.Wrap(http_schedule_task)).Methods("POST")
router.HandleFunc("/tasks/{task-id}/update", worker_authenticator.Wrap(http_task_update)).Methods("POST") router.HandleFunc("/tasks/{task-id}/update", worker_authenticator.Wrap(http_task_update)).Methods("POST")
router.HandleFunc("/may-i-run/{task-id}", worker_authenticator.Wrap(http_worker_may_run_task)).Methods("GET") router.HandleFunc("/may-i-run/{task-id}", worker_authenticator.Wrap(http_worker_may_run_task)).Methods("GET")
router.HandleFunc("/sign-off", worker_authenticator.Wrap(http_worker_sign_off)).Methods("POST")
router.HandleFunc("/kick", http_kick) router.HandleFunc("/kick", http_kick)
upstream.SendStartupNotification() upstream.SendStartupNotification()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment