Skip to content
Snippets Groups Projects
Commit b3dccda9 authored by Sybren A. Stüvel's avatar Sybren A. Stüvel
Browse files

Manager: added receiver for task updates

parent 4af7239e
Branches
Tags
No related merge requests found
......@@ -7,6 +7,7 @@ This is the Flamenco Manager implementation in Go.
`$FM` denotes the directory containing a checkout of Flamenco Manager, that is, the
absolute path of this `flamenco-manager-go` directory.
0. Install Go 1.7 or newer
1. `export GOPATH=$FM`
2. `cd $FM/src/flamenco-manager`
3. Download all dependencies with `go get`
......
......@@ -42,6 +42,17 @@ func ensure_indices(session *mgo.Session) {
if err := db.C("flamenco_tasks").EnsureIndex(index); err != nil {
panic(err)
}
index = mgo.Index{
Key: []string{"task_id", "received_on_manager"},
Unique: false,
DropDups: false,
Background: false,
Sparse: false,
}
if err := db.C("task_update_queue").EnsureIndex(index); err != nil {
panic(err)
}
}
/**
......
......@@ -29,6 +29,17 @@ type Task struct {
Worker string `bson:"worker,omitempty" json:"worker,omitempty"`
}
type TaskUpdate struct {
TaskId bson.ObjectId `bson:"task_id" json:"-"`
TaskStatus string `bson:"task_status,omitempty" json:"task_status,omitempty"`
ReceivedOnManager time.Time `bson:"received_on_manager" json:"-"`
Activity string `bson:"activity,omitempty" json:"activity,omitempty"`
TaskProgressPercentage int `bson:"task_progress_percentage,omitempty" json:"task_progress_percentage,omitempty"`
CurrentCommandIdx int `bson:"current_command_idx,omitempty" json:"current_command_idx,omitempty"`
CommandProgressPercentage int `bson:"command_progress_percentage,omitempty" json:"command_progress_percentage,omitempty"`
Log string `bson:"log,omitempty" json:"log,omitempty"`
}
type WorkerRegistration struct {
Secret string `json:"secret"`
Platform string `bson:"platform" json:"platform"`
......
package flamenco
import (
"encoding/json"
"fmt"
"io"
"log"
"net/http"
)
/**
* Decodes JSON and writes a Bad Request status if it fails.
*/
func DecodeJson(w http.ResponseWriter, r io.Reader, document interface{},
logprefix string) error {
dec := json.NewDecoder(r)
if err := dec.Decode(document); err != nil {
log.Printf("%s Unable to decode JSON: %s", logprefix, err)
w.WriteHeader(http.StatusBadRequest)
fmt.Fprintf(w, "Unable to decode JSON: %s\n", err)
return err
}
return nil
}
package flamenco
import (
"fmt"
"log"
"net/http"
"time"
auth "github.com/abbot/go-http-auth"
mgo "gopkg.in/mgo.v2"
"gopkg.in/mgo.v2/bson"
)
func HandleTaskUpdate(w http.ResponseWriter, r *auth.AuthenticatedRequest, db *mgo.Database,
task_id bson.ObjectId) {
log.Printf("%s Received task update for task %s\n", r.RemoteAddr, task_id.Hex())
// Parse the task JSON
tupdate := TaskUpdate{}
defer r.Body.Close()
if err := DecodeJson(w, r.Body, &tupdate, fmt.Sprintf("%s HandleTaskUpdate:", r.RemoteAddr)); err != nil {
return
}
// For ensuring the ordering of updates. time.Time has nanosecond precision.
tupdate.ReceivedOnManager = time.Now().UTC()
tupdate.TaskId = task_id
// Store the update in the queue for sending to the Flamenco Server later.
task_update_queue := db.C("task_update_queue")
if err := task_update_queue.Insert(&tupdate); err != nil {
log.Printf("%s HandleTaskUpdate: error inserting task update in queue: %s",
r.RemoteAddr, err)
w.WriteHeader(http.StatusInternalServerError)
fmt.Fprintf(w, "Unable to store update: %s\n", err)
return
}
w.WriteHeader(204)
}
......@@ -19,11 +19,7 @@ func RegisterWorker(w http.ResponseWriter, r *http.Request, db *mgo.Database) {
// Parse the given worker information.
winfo := WorkerRegistration{}
decoder := json.NewDecoder(r.Body)
defer r.Body.Close()
if err = decoder.Decode(&winfo); err != nil {
log.Println(r.RemoteAddr, "Unable to decode worker JSON:", err)
if err = DecodeJson(w, r.Body, &winfo, fmt.Sprintf("%s RegisterWorker:", r.RemoteAddr)); err != nil {
return
}
......
package main
import (
"fmt"
"log"
"net/http"
"strings"
"gopkg.in/mgo.v2"
"gopkg.in/mgo.v2/bson"
auth "github.com/abbot/go-http-auth"
......@@ -39,6 +41,22 @@ func http_kick(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(204)
}
func http_task_update(w http.ResponseWriter, r *auth.AuthenticatedRequest) {
mongo_sess := session.Copy()
defer mongo_sess.Close()
vars := mux.Vars(&r.Request)
task_id := vars["task-id"]
if !bson.IsObjectIdHex(task_id) {
w.WriteHeader(http.StatusNotFound)
fmt.Fprintf(w, "Invalid ObjectID passed as task ID: %s\n", task_id)
return
}
flamenco.HandleTaskUpdate(w, r, mongo_sess.DB(""), bson.ObjectIdHex(task_id))
}
func worker_secret(user, realm string) string {
mongo_sess := session.Copy()
defer mongo_sess.Close()
......@@ -61,6 +79,7 @@ func main() {
router.HandleFunc("/", http_status).Methods("GET")
router.HandleFunc("/register-worker", http_register_worker).Methods("POST")
router.HandleFunc("/task", worker_authenticator.Wrap(http_schedule_task)).Methods("POST")
router.HandleFunc("/tasks/{task-id}/update", worker_authenticator.Wrap(http_task_update)).Methods("POST")
router.HandleFunc("/kick", http_kick)
log.Println("Listening at :", config.Listen)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment