Skip to content
Snippets Groups Projects
Commit 51bbd2d7 authored by Sybren A. Stüvel's avatar Sybren A. Stüvel
Browse files

Merge branch 'sybren-cleanups-go1.8' into master

parents 4ad2e539 a3c61aea
Branches
Tags
No related merge requests found
Showing
with 270 additions and 228 deletions
......@@ -29,7 +29,7 @@ Flamenco Manager accepts the following CLI arguments:
of this `flamenco-manager-go` directory.
0. Make sure you have MongoDB up and running (on localhost)
1. Install Go 1.7 or newer
1. Install Go 1.8 or newer
2. `export GOPATH=$FM`
3. `cd $FM/src/flamenco-manager`
4. Download all dependencies with `go get`
......
package flamenco
import (
"sync"
log "github.com/Sirupsen/logrus"
)
// closable offers a way to cleanly shut down a running goroutine.
type closable struct {
doneChan chan struct{}
doneWg *sync.WaitGroup
}
// makeClosable constructs a new closable struct
func makeClosable() closable {
return closable{make(chan struct{}), new(sync.WaitGroup)}
}
// closableAdd(delta) should be combined with 'delta' calls to closableDone()
func (closable *closable) closableAdd(delta int) bool {
log.Debugf("Closable: doneWg.Add(%d) ok", delta)
closable.doneWg.Add(delta)
return true
}
// closableDone marks one "thing" as "done"
func (closable *closable) closableDone() {
log.Debugf("Closable: doneWg.Done() ok")
closable.doneWg.Done()
}
// closableCloseAndWait marks the goroutine as "done",
// and waits for all things added with closableAdd() to be "done" too.
func (closable *closable) closableCloseAndWait() {
close(closable.doneChan)
log.Debugf("Closable: waiting for shutdown to finish.")
closable.doneWg.Wait()
}
......@@ -14,14 +14,13 @@ type countresult struct {
Count int `bson:"count"`
}
// M is a shortcut for bson.M to make longer queries easier to read.
type M bson.M
/**
* Returns a MongoDB session.
*
* The database name should be configured in the database URL.
* You can use this default database using session.DB("").
*/
// MongoSession returns a MongoDB session.
//
// The database name should be configured in the database URL.
// You can use this default database using session.DB("").
func MongoSession(config *Conf) *mgo.Session {
var err error
var session *mgo.Session
......@@ -32,12 +31,12 @@ func MongoSession(config *Conf) *mgo.Session {
}
session.SetMode(mgo.Monotonic, true)
ensure_indices(session)
ensureIndices(session)
return session
}
func ensure_indices(session *mgo.Session) {
func ensureIndices(session *mgo.Session) {
db := session.DB("")
index := mgo.Index{
......@@ -63,11 +62,9 @@ func ensure_indices(session *mgo.Session) {
}
}
/**
* Counts the number of documents in the given collection.
*/
// Count returns the number of documents in the given collection.
func Count(coll *mgo.Collection) (int, error) {
aggr_ops := []bson.M{
aggrOps := []bson.M{
bson.M{
"$group": bson.M{
"_id": nil,
......@@ -75,7 +72,7 @@ func Count(coll *mgo.Collection) (int, error) {
},
},
}
pipe := coll.Pipe(aggr_ops)
pipe := coll.Pipe(aggrOps)
result := countresult{}
if err := pipe.One(&result); err != nil {
if err == mgo.ErrNotFound {
......@@ -88,24 +85,26 @@ func Count(coll *mgo.Collection) (int, error) {
return result.Count, nil
}
// GetSettings returns the settings as saved in our MongoDB.
func GetSettings(db *mgo.Database) *SettingsInMongo {
settings := &SettingsInMongo{}
err := db.C("settings").Find(bson.M{}).One(settings)
if err != nil && err != mgo.ErrNotFound {
log.Errorf("db.GetSettings: Unable to get settings: ", err)
log.Panic("db.GetSettings: Unable to get settings: ", err)
}
return settings
}
// SaveSettings stores the given settings in MongoDB.
func SaveSettings(db *mgo.Database, settings *SettingsInMongo) {
_, err := db.C("settings").Upsert(bson.M{}, settings)
if err != nil && err != mgo.ErrNotFound {
log.Errorf("db.SaveSettings: Unable to save settings: ", err)
log.Panic("db.SaveSettings: Unable to save settings: ", err)
}
}
/* Erases all tasks in the flamenco_tasks collection. */
// CleanSlate erases all tasks in the flamenco_tasks collection.
func CleanSlate(db *mgo.Database) {
fmt.Println("")
fmt.Println("Performing Clean Slate operation, this will erase all tasks from the local DB.")
......
......@@ -6,13 +6,15 @@ import (
"gopkg.in/mgo.v2/bson"
)
// Command is an executable part of a Task
type Command struct {
Name string `bson:"name" json:"name"`
Settings bson.M `bson:"settings" json:"settings"`
}
// Task contains a Flamenco task, with some BSON-only fields for local Manager use.
type Task struct {
Id bson.ObjectId `bson:"_id,omitempty" json:"_id,omitempty"`
ID bson.ObjectId `bson:"_id,omitempty" json:"_id,omitempty"`
Etag string `bson:"_etag,omitempty" json:"_etag,omitempty"`
Job bson.ObjectId `bson:"job,omitempty" json:"job"`
Manager bson.ObjectId `bson:"manager,omitempty" json:"manager"`
......@@ -30,23 +32,24 @@ type Task struct {
Worker string `bson:"worker,omitempty" json:"worker,omitempty"`
// Internal bookkeeping
WorkerId *bson.ObjectId `bson:"worker_id,omitempty" json:"-"`
WorkerID *bson.ObjectId `bson:"worker_id,omitempty" json:"-"`
LastWorkerPing *time.Time `bson:"last_worker_ping,omitempty" json:"-"`
}
type AggregationPipelineResult struct {
type aggregationPipelineResult struct {
// For internal MongoDB querying only
Task *Task `bson:"task"`
}
// Dependency graph response from Server.
// ScheduledTasks contains a dependency graph response from Server.
type ScheduledTasks struct {
Depsgraph []Task `json:"depsgraph"`
}
// Both sent from Worker to Manager, as well as from Manager to Server.
// TaskUpdate is both sent from Worker to Manager, as well as from Manager to Server.
type TaskUpdate struct {
Id bson.ObjectId `bson:"_id" json:"_id"`
TaskId bson.ObjectId `bson:"task_id" json:"task_id,omitempty"`
ID bson.ObjectId `bson:"_id" json:"_id"`
TaskID bson.ObjectId `bson:"task_id" json:"task_id,omitempty"`
TaskStatus string `bson:"task_status,omitempty" json:"task_status,omitempty"`
ReceivedOnManager time.Time `bson:"received_on_manager" json:"received_on_manager"`
Activity string `bson:"activity,omitempty" json:"activity,omitempty"`
......@@ -57,13 +60,14 @@ type TaskUpdate struct {
Worker string `bson:"worker" json:"worker"`
}
// Received from Server.
// TaskUpdateResponse is received from Server.
type TaskUpdateResponse struct {
ModifiedCount int `json:"modified_count"`
HandledUpdateIds []bson.ObjectId `json:"handled_update_ids,omitempty"`
CancelTasksIds []bson.ObjectId `json:"cancel_task_ids,omitempty"`
}
// WorkerRegistration is sent by the Worker to register itself at this Manager.
type WorkerRegistration struct {
Secret string `json:"secret"`
Platform string `json:"platform"`
......@@ -71,8 +75,10 @@ type WorkerRegistration struct {
Nickname string `json:"nickname"`
}
// Worker contains all information about a specific Worker.
// Some fields come from the WorkerRegistration, whereas others are filled by us.
type Worker struct {
Id bson.ObjectId `bson:"_id,omitempty" json:"_id,omitempty"`
ID bson.ObjectId `bson:"_id,omitempty" json:"_id,omitempty"`
Secret string `bson:"-" json:"-"`
HashedSecret []byte `bson:"hashed_secret" json:"-"`
Nickname string `bson:"nickname" json:"nickname"`
......@@ -85,30 +91,30 @@ type Worker struct {
SupportedJobTypes []string `bson:"supported_job_types" json:"supported_job_types"`
}
/**
* Notification sent to upstream Flamenco Server upon startup. This is a combination
* of settings (see settings.go) and information from the database.
*/
// StartupNotification sent to upstream Flamenco Server upon startup. This is a combination
// of settings (see settings.go) and information from the database.
type StartupNotification struct {
// Settings
ManagerUrl string `json:"manager_url"`
ManagerURL string `json:"manager_url"`
VariablesByVarname map[string]map[string]string `json:"variables"`
// From our local database
NumberOfWorkers int `json:"nr_of_workers"`
}
// MayKeepRunningResponse is sent to workers to indicate whether they can keep running their task.
type MayKeepRunningResponse struct {
MayKeepRunning bool `json:"may_keep_running"`
Reason string `json:"reason,omitempty"`
}
// Settings we want to be able to update from within Flamenco Manager itself,
// so those are stored in MongoDB itself.
// SettingsInMongo contains settings we want to be able to update from
// within Flamenco Manager itself, so those are stored in MongoDB.
type SettingsInMongo struct {
DepsgraphLastModified *string `bson:"depsgraph_last_modified"`
}
// StatusReport is sent in response to a query on the / URL.
type StatusReport struct {
NrOfWorkers int `json:"nr_of_workers"`
NrOfTasks int `json:"nr_of_tasks"`
......
......@@ -20,7 +20,7 @@ func ConstructTestTask(task_id, job_type string) Task {
func ConstructTestTaskWithPrio(task_id, job_type string, priority int) Task {
return Task{
Id: bson.ObjectIdHex(task_id),
ID: bson.ObjectIdHex(task_id),
Etag: "1234567",
Job: bson.ObjectIdHex("bbbbbbbbbbbbbbbbbbbbbbbb"),
Manager: bson.ObjectIdHex("cccccccccccccccccccccccc"),
......
......@@ -64,7 +64,7 @@ func (ts *TaskScheduler) ScheduleTask(w http.ResponseWriter, r *auth.Authenticat
break
}
log.Debugf("Task %s was changed, reexamining queue.", task.Id.Hex())
log.Debugf("Task %s was changed, reexamining queue.", task.ID.Hex())
}
if was_changed {
log.Errorf("Infinite loop detected, tried 1000 tasks and they all changed...")
......@@ -74,14 +74,14 @@ func (ts *TaskScheduler) ScheduleTask(w http.ResponseWriter, r *auth.Authenticat
// Update the task status to "active", pushing it as a task update to the manager too.
task.Status = "active"
tupdate := TaskUpdate{TaskId: task.Id, TaskStatus: task.Status}
tupdate := TaskUpdate{TaskID: task.ID, TaskStatus: task.Status}
local_updates := bson.M{
"worker_id": worker.Id,
"worker_id": worker.ID,
"last_worker_ping": UtcNow(),
}
if err := QueueTaskUpdateWithExtra(&tupdate, db, local_updates); err != nil {
log.Errorf("Unable to queue task update while assigning task %s to worker %s: %s",
task.Id.Hex(), worker.Identifier(), err)
task.ID.Hex(), worker.Identifier(), err)
w.WriteHeader(http.StatusInternalServerError)
return
}
......@@ -95,12 +95,12 @@ func (ts *TaskScheduler) ScheduleTask(w http.ResponseWriter, r *auth.Authenticat
encoder.Encode(task)
log.Infof("ScheduleTask: assigned task %s to worker %s",
task.Id.Hex(), worker.Identifier())
task.ID.Hex(), worker.Identifier())
// Push a task log line stating we've assigned this task to the given worker.
// This is done here, instead of by the worker, so that it's logged even if the worker fails.
msg := fmt.Sprintf("Manager assigned task to worker %s", worker.Identifier())
LogTaskActivity(worker, task.Id, msg, time.Now().Format(IsoFormat)+": "+msg, db)
LogTaskActivity(worker, task.ID, msg, time.Now().Format(IsoFormat)+": "+msg, db)
}
/**
......@@ -116,7 +116,7 @@ func (ts *TaskScheduler) fetchTaskFromQueueOrManager(
return nil
}
result := AggregationPipelineResult{}
result := aggregationPipelineResult{}
tasks_coll := db.C("flamenco_tasks")
pipe := tasks_coll.Pipe([]M{
......
......@@ -94,7 +94,7 @@ func (s *SchedulerTestSuite) TestVariableReplacement(t *check.C) {
// Perform HTTP request
resp_rec := httptest.NewRecorder()
request, _ := http.NewRequest("GET", "/task", nil)
ar := &auth.AuthenticatedRequest{Request: *request, Username: s.worker_lnx.Id.Hex()}
ar := &auth.AuthenticatedRequest{Request: *request, Username: s.worker_lnx.ID.Hex()}
s.sched.ScheduleTask(resp_rec, ar)
// Check the response JSON
......@@ -106,7 +106,7 @@ func (s *SchedulerTestSuite) TestVariableReplacement(t *check.C) {
json_task.Commands[0].Settings["message"])
// Check worker with other job type
ar = &auth.AuthenticatedRequest{Request: *request, Username: s.worker_win.Id.Hex()}
ar = &auth.AuthenticatedRequest{Request: *request, Username: s.worker_win.ID.Hex()}
s.sched.ScheduleTask(resp_rec, ar)
// Check the response JSON
......@@ -132,13 +132,13 @@ func (s *SchedulerTestSuite) TestSchedulerOrderByPriority(t *check.C) {
// Perform HTTP request to the scheduler.
resp_rec := httptest.NewRecorder()
request, _ := http.NewRequest("GET", "/task", nil)
ar := &auth.AuthenticatedRequest{Request: *request, Username: s.worker_lnx.Id.Hex()}
ar := &auth.AuthenticatedRequest{Request: *request, Username: s.worker_lnx.ID.Hex()}
s.sched.ScheduleTask(resp_rec, ar)
// We should have gotten task 2, because it has the highest priority.
json_task := Task{}
parseJson(t, resp_rec, 200, &json_task)
assert.Equal(t, task2.Id.Hex(), json_task.Id.Hex())
assert.Equal(t, task2.ID.Hex(), json_task.ID.Hex())
}
func (s *SchedulerTestSuite) TestSchedulerOrderByJobPriority(t *check.C) {
......@@ -157,13 +157,13 @@ func (s *SchedulerTestSuite) TestSchedulerOrderByJobPriority(t *check.C) {
// Perform HTTP request to the scheduler.
resp_rec := httptest.NewRecorder()
request, _ := http.NewRequest("GET", "/task", nil)
ar := &auth.AuthenticatedRequest{Request: *request, Username: s.worker_lnx.Id.Hex()}
ar := &auth.AuthenticatedRequest{Request: *request, Username: s.worker_lnx.ID.Hex()}
s.sched.ScheduleTask(resp_rec, ar)
// We should have gotten task 1, because its job has the highest priority.
json_task := Task{}
parseJson(t, resp_rec, 200, &json_task)
assert.Equal(t, task1.Id.Hex(), json_task.Id.Hex())
assert.Equal(t, task1.ID.Hex(), json_task.ID.Hex())
}
/**
......@@ -204,7 +204,7 @@ func (s *SchedulerTestSuite) TestSchedulerVerifyUpstreamCanceled(t *check.C) {
// Perform HTTP request to the scheduler.
resp_rec := httptest.NewRecorder()
request, _ := http.NewRequest("GET", "/task", nil)
ar := &auth.AuthenticatedRequest{Request: *request, Username: s.worker_lnx.Id.Hex()}
ar := &auth.AuthenticatedRequest{Request: *request, Username: s.worker_lnx.ID.Hex()}
s.sched.ScheduleTask(resp_rec, ar)
timedout := <-timeout
......@@ -215,11 +215,11 @@ func (s *SchedulerTestSuite) TestSchedulerVerifyUpstreamCanceled(t *check.C) {
parseJson(t, resp_rec, 200, &json_task)
// We should have gotten task 1, because task 2 was canceled.
assert.Equal(t, task1.Id.Hex(), json_task.Id.Hex())
assert.Equal(t, task1.ID.Hex(), json_task.ID.Hex())
// In our queue, task 2 should have been canceled, since it was canceled on the server.
found_task2 := Task{}
err := s.db.C("flamenco_tasks").FindId(task2.Id).One(&found_task2)
err := s.db.C("flamenco_tasks").FindId(task2.ID).One(&found_task2)
assert.Equal(t, nil, err)
assert.Equal(t, "canceled", found_task2.Status)
}
......@@ -256,7 +256,7 @@ func (s *SchedulerTestSuite) TestSchedulerVerifyUpstreamPrioChange(t *check.C) {
// Perform HTTP request to the scheduler.
resp_rec := httptest.NewRecorder()
request, _ := http.NewRequest("GET", "/task", nil)
ar := &auth.AuthenticatedRequest{Request: *request, Username: s.worker_lnx.Id.Hex()}
ar := &auth.AuthenticatedRequest{Request: *request, Username: s.worker_lnx.ID.Hex()}
s.sched.ScheduleTask(resp_rec, ar)
timedout := <-timeout
......@@ -267,16 +267,16 @@ func (s *SchedulerTestSuite) TestSchedulerVerifyUpstreamPrioChange(t *check.C) {
parseJson(t, resp_rec, 200, &json_task)
// We should have gotten task 1, because task 2 was lowered in prio.
assert.Equal(t, task1.Id.Hex(), json_task.Id.Hex())
assert.Equal(t, task1.ID.Hex(), json_task.ID.Hex())
// In our queue, task 2 should have been lowered in prio, and task1 should be active.
found_task := Task{}
err := s.db.C("flamenco_tasks").FindId(task2.Id).One(&found_task)
err := s.db.C("flamenco_tasks").FindId(task2.ID).One(&found_task)
assert.Equal(t, nil, err)
assert.Equal(t, "queued", found_task.Status)
assert.Equal(t, 5, found_task.Priority)
err = s.db.C("flamenco_tasks").FindId(task1.Id).One(&found_task)
err = s.db.C("flamenco_tasks").FindId(task1.ID).One(&found_task)
assert.Equal(t, nil, err)
assert.Equal(t, "active", found_task.Status)
assert.Equal(t, 50, found_task.Priority)
......@@ -310,7 +310,7 @@ func (s *SchedulerTestSuite) TestSchedulerVerifyUpstreamDeleted(t *check.C) {
// Perform HTTP request to the scheduler.
resp_rec := httptest.NewRecorder()
request, _ := http.NewRequest("GET", "/task", nil)
ar := &auth.AuthenticatedRequest{Request: *request, Username: s.worker_lnx.Id.Hex()}
ar := &auth.AuthenticatedRequest{Request: *request, Username: s.worker_lnx.ID.Hex()}
s.sched.ScheduleTask(resp_rec, ar)
timedout := <-timeout
......@@ -321,16 +321,16 @@ func (s *SchedulerTestSuite) TestSchedulerVerifyUpstreamDeleted(t *check.C) {
parseJson(t, resp_rec, 200, &json_task)
// We should have gotten task 1, because task 2 was deleted.
assert.Equal(t, task1.Id.Hex(), json_task.Id.Hex())
assert.Equal(t, task1.ID.Hex(), json_task.ID.Hex())
// In our queue, task 2 should have been canceled, and task1 should be active.
found_task := Task{}
err := s.db.C("flamenco_tasks").FindId(task2.Id).One(&found_task)
err := s.db.C("flamenco_tasks").FindId(task2.ID).One(&found_task)
assert.Equal(t, nil, err)
assert.Equal(t, "canceled", found_task.Status)
assert.Equal(t, 100, found_task.Priority)
err = s.db.C("flamenco_tasks").FindId(task1.Id).One(&found_task)
err = s.db.C("flamenco_tasks").FindId(task1.ID).One(&found_task)
assert.Equal(t, nil, err)
assert.Equal(t, "active", found_task.Status)
assert.Equal(t, 50, found_task.Priority)
......@@ -342,17 +342,17 @@ func (s *SchedulerTestSuite) TestParentTaskNotCompleted(c *check.C) {
// Task 1 is being worked on by worker_win
task1 := ConstructTestTaskWithPrio("1aaaaaaaaaaaaaaaaaaaaaaa", "sleeping", 50)
task1.Status = "active"
task1.WorkerId = &s.worker_win.Id
task1.WorkerID = &s.worker_win.ID
assert.Nil(c, tasks_coll.Insert(task1))
// Task 2 is unavailable due to its parent not being completed.
task2 := ConstructTestTaskWithPrio("2aaaaaaaaaaaaaaaaaaaaaaa", "sleeping", 100)
task2.Parents = []bson.ObjectId{task1.Id}
task2.Parents = []bson.ObjectId{task1.ID}
task2.Status = "claimed-by-manager"
assert.Nil(c, tasks_coll.Insert(task2))
// Fetch a task from the queue
resp_rec, _ := WorkerTestRequest(s.worker_lnx.Id, "TEST", "/whatevah")
resp_rec, _ := WorkerTestRequest(s.worker_lnx.ID, "TEST", "/whatevah")
task := s.sched.fetchTaskFromQueueOrManager(resp_rec, s.db, &s.worker_lnx)
// We should not get any task back, since task1 is already taken, and task2
......@@ -367,25 +367,25 @@ func (s *SchedulerTestSuite) TestParentTaskCompleted(c *check.C) {
// Task 1 has been completed by worker_win
task1 := ConstructTestTaskWithPrio("1aaaaaaaaaaaaaaaaaaaaaaa", "sleeping", 50)
task1.Status = "completed"
task1.WorkerId = &s.worker_win.Id
task1.WorkerID = &s.worker_win.ID
assert.Nil(c, tasks_coll.Insert(task1))
// Task 2 is available due to its parent being completed.
task2 := ConstructTestTaskWithPrio("2aaaaaaaaaaaaaaaaaaaaaaa", "sleeping", 100)
task2.Parents = []bson.ObjectId{task1.Id}
task2.Parents = []bson.ObjectId{task1.ID}
task2.Status = "claimed-by-manager"
assert.Nil(c, tasks_coll.Insert(task2))
// Fetch a task from the queue
resp_rec, _ := WorkerTestRequest(s.worker_lnx.Id, "TEST", "/whatevah")
resp_rec, _ := WorkerTestRequest(s.worker_lnx.ID, "TEST", "/whatevah")
task := s.sched.fetchTaskFromQueueOrManager(resp_rec, s.db, &s.worker_lnx)
assert.Equal(c, http.StatusOK, resp_rec.Code)
// We should get task 2.
assert.NotNil(c, task, "Expected task %s, got nil instead", task2.Id.Hex())
assert.NotNil(c, task, "Expected task %s, got nil instead", task2.ID.Hex())
if task != nil { // prevent nil pointer dereference
assert.Equal(c, task.Id, task2.Id, "Expected task %s, got task %s instead",
task2.Id.Hex(), task.Id.Hex())
assert.Equal(c, task.ID, task2.ID, "Expected task %s, got task %s instead",
task2.ID.Hex(), task.ID.Hex())
}
}
......@@ -395,23 +395,23 @@ func (s *SchedulerTestSuite) TestParentTaskOneCompletedOneNot(c *check.C) {
// Task 1 is being worked on by worker_win
task1 := ConstructTestTaskWithPrio("1aaaaaaaaaaaaaaaaaaaaaaa", "sleeping", 50)
task1.Status = "active"
task1.WorkerId = &s.worker_win.Id
task1.WorkerID = &s.worker_win.ID
assert.Nil(c, tasks_coll.Insert(task1))
// Task 2 is already completed.
task2 := ConstructTestTaskWithPrio("2aaaaaaaaaaaaaaaaaaaaaaa", "sleeping", 50)
task2.Status = "completed"
task2.WorkerId = &s.worker_win.Id
task2.WorkerID = &s.worker_win.ID
assert.Nil(c, tasks_coll.Insert(task2))
// Task 3 is unavailable due to one of its parent not being completed.
task3 := ConstructTestTaskWithPrio("3aaaaaaaaaaaaaaaaaaaaaaa", "sleeping", 100)
task3.Parents = []bson.ObjectId{task1.Id, task2.Id}
task3.Parents = []bson.ObjectId{task1.ID, task2.ID}
task3.Status = "claimed-by-manager"
assert.Nil(c, tasks_coll.Insert(task3))
// Fetch a task from the queue
resp_rec, _ := WorkerTestRequest(s.worker_lnx.Id, "TEST", "/whatevah")
resp_rec, _ := WorkerTestRequest(s.worker_lnx.ID, "TEST", "/whatevah")
task := s.sched.fetchTaskFromQueueOrManager(resp_rec, s.db, &s.worker_lnx)
// We should not get any task back.
......
......@@ -5,7 +5,6 @@ package flamenco
import (
"fmt"
"sync"
"time"
log "github.com/Sirupsen/logrus"
......@@ -17,17 +16,15 @@ const TASK_TIMEOUT_CHECK_INTERVAL = 5 * time.Second
const TASK_TIMEOUT_CHECK_INITIAL_SLEEP = 5 * time.Minute
type TaskTimeoutChecker struct {
closable
config *Conf
session *mgo.Session
done_chan chan bool
done_wg *sync.WaitGroup
}
func CreateTaskTimeoutChecker(config *Conf, session *mgo.Session) *TaskTimeoutChecker {
return &TaskTimeoutChecker{
makeClosable(),
config, session,
make(chan bool),
&sync.WaitGroup{},
}
}
......@@ -36,21 +33,19 @@ func (self *TaskTimeoutChecker) Go() {
defer session.Close()
db := session.DB("")
self.done_wg.Add(1)
defer self.done_wg.Done()
defer log.Infof("TaskTimeoutChecker: shutting down.")
self.closableAdd(1)
defer self.closableDone()
defer log.Info("TaskTimeoutChecker: shutting down.")
// Start with a delay, so that workers get a chance to push their updates
// after the manager has started up.
ok := KillableSleep("TaskTimeoutChecker-initial", TASK_TIMEOUT_CHECK_INITIAL_SLEEP,
self.done_chan, self.done_wg)
ok := KillableSleep("TaskTimeoutChecker-initial", TASK_TIMEOUT_CHECK_INITIAL_SLEEP, &self.closable)
if !ok {
log.Warningf("TaskTimeoutChecker: Killable sleep was killed, not even starting checker.")
log.Info("TaskTimeoutChecker: Killable sleep was killed, not even starting checker.")
return
}
timer := Timer("TaskTimeoutCheck", TASK_TIMEOUT_CHECK_INTERVAL, false,
self.done_chan, self.done_wg)
timer := Timer("TaskTimeoutCheck", TASK_TIMEOUT_CHECK_INTERVAL, false, &self.closable)
for _ = range timer {
self.Check(db)
......@@ -59,9 +54,7 @@ func (self *TaskTimeoutChecker) Go() {
}
func (self *TaskTimeoutChecker) Close() {
close(self.done_chan)
log.Debug("TaskTimeoutChecker: waiting for shutdown to finish.")
self.done_wg.Wait()
self.closableCloseAndWait()
log.Debug("TaskTimeoutChecker: shutdown complete.")
}
......@@ -90,24 +83,24 @@ func (self *TaskTimeoutChecker) Check(db *mgo.Database) {
}
for _, task := range timedout_tasks {
log.Warningf(" - Task %s (%s) timed out", task.Name, task.Id.Hex())
log.Warningf(" - Task %s (%s) timed out", task.Name, task.ID.Hex())
var ident string
if task.Worker != "" {
ident = task.Worker
} else if task.WorkerId != nil {
ident = task.WorkerId.Hex()
} else if task.WorkerID != nil {
ident = task.WorkerID.Hex()
} else {
ident = "-no worker-"
}
tupdate := TaskUpdate{
TaskId: task.Id,
TaskID: task.ID,
TaskStatus: "failed",
Activity: fmt.Sprintf("Task timed out on worker %s", ident),
Log: fmt.Sprintf(
"%s Task %s (%s) timed out, was active but untouched since %s. "+
"Was handled by worker %s",
UtcNow().Format(IsoFormat), task.Name, task.Id.Hex(), task.LastWorkerPing, ident),
UtcNow().Format(IsoFormat), task.Name, task.ID.Hex(), task.LastWorkerPing, ident),
}
QueueTaskUpdate(&tupdate, db)
}
......
......@@ -6,7 +6,6 @@ package flamenco
import (
"fmt"
"net/http"
"sync"
"time"
log "github.com/Sirupsen/logrus"
......@@ -20,13 +19,10 @@ const QUEUE_MGO_COLLECTION = "task_update_queue"
const TASK_QUEUE_INSPECT_PERIOD = 1 * time.Second
type TaskUpdatePusher struct {
closable
config *Conf
upstream *UpstreamConnection
session *mgo.Session
// For allowing shutdown.
done chan bool
done_wg *sync.WaitGroup
}
/**
......@@ -54,7 +50,7 @@ func QueueTaskUpdateFromWorker(w http.ResponseWriter, r *auth.AuthenticatedReque
if err := DecodeJson(w, r.Body, &tupdate, fmt.Sprintf("%s QueueTaskUpdate:", worker.Identifier())); err != nil {
return
}
tupdate.TaskId = task_id
tupdate.TaskID = task_id
tupdate.Worker = worker.Identifier()
// Check that this worker is allowed to update this task.
......@@ -66,20 +62,20 @@ func QueueTaskUpdateFromWorker(w http.ResponseWriter, r *auth.AuthenticatedReque
fmt.Fprintf(w, "Task %s is unknown.", task_id.Hex())
return
}
if task.WorkerId != nil && *task.WorkerId != worker.Id {
if task.WorkerID != nil && *task.WorkerID != worker.ID {
log.Warningf("%s QueueTaskUpdateFromWorker: task %s update rejected from %s (%s), task is assigned to %s",
r.RemoteAddr, task_id.Hex(), worker.Id.Hex(), worker.Identifier(), task.WorkerId.Hex())
r.RemoteAddr, task_id.Hex(), worker.ID.Hex(), worker.Identifier(), task.WorkerID.Hex())
w.WriteHeader(http.StatusConflict)
fmt.Fprintf(w, "Task %s is assigned to another worker.", task_id.Hex())
return
}
// Only set the task's worker ID if it's not already set to the current worker.
var set_worker_id *bson.ObjectId = nil
if task.WorkerId == nil {
set_worker_id = &worker.Id
// Only set the task's worker.ID if it's not already set to the current worker.
var setWorkerID *bson.ObjectId
if task.WorkerID == nil {
setWorkerID = &worker.ID
}
WorkerPingedTask(set_worker_id, tupdate.TaskId, db)
WorkerPingedTask(setWorkerID, tupdate.TaskID, db)
if err := QueueTaskUpdate(&tupdate, db); err != nil {
log.Warningf("%s: %s", worker.Identifier(), err)
......@@ -101,7 +97,7 @@ func QueueTaskUpdate(tupdate *TaskUpdate, db *mgo.Database) error {
func QueueTaskUpdateWithExtra(tupdate *TaskUpdate, db *mgo.Database, extra_updates bson.M) error {
// For ensuring the ordering of updates. time.Time has nanosecond precision.
tupdate.ReceivedOnManager = time.Now().UTC()
tupdate.Id = bson.NewObjectId()
tupdate.ID = bson.NewObjectId()
// Store the update in the queue for sending to the Flamenco Server later.
task_update_queue := db.C(QUEUE_MGO_COLLECTION)
......@@ -116,27 +112,27 @@ func QueueTaskUpdateWithExtra(tupdate *TaskUpdate, db *mgo.Database, extra_updat
updates := extra_updates
if tupdate.TaskStatus != "" {
// Before blindly applying the task status, first check if the transition is valid.
if TaskStatusTransitionValid(task_coll, tupdate.TaskId, tupdate.TaskStatus) {
if TaskStatusTransitionValid(task_coll, tupdate.TaskID, tupdate.TaskStatus) {
updates["status"] = tupdate.TaskStatus
} else {
log.Warningf("QueueTaskUpdate: not locally applying status=%s for %s",
tupdate.TaskStatus, tupdate.TaskId.Hex())
tupdate.TaskStatus, tupdate.TaskID.Hex())
}
}
if tupdate.Activity != "" {
updates["activity"] = tupdate.Activity
}
if len(updates) > 0 {
log.Debugf("QueueTaskUpdate: applying update %s to task %s", updates, tupdate.TaskId.Hex())
if err := task_coll.UpdateId(tupdate.TaskId, bson.M{"$set": updates}); err != nil {
log.Debugf("QueueTaskUpdate: applying update %s to task %s", updates, tupdate.TaskID.Hex())
if err := task_coll.UpdateId(tupdate.TaskID, bson.M{"$set": updates}); err != nil {
if err != mgo.ErrNotFound {
return fmt.Errorf("QueueTaskUpdate: error updating local task cache: %s", err)
} else {
log.Warningf("QueueTaskUpdate: cannot find task %s to update locally", tupdate.TaskId.Hex())
log.Warningf("QueueTaskUpdate: cannot find task %s to update locally", tupdate.TaskID.Hex())
}
}
} else {
log.Debugf("QueueTaskUpdate: nothing to do locally for task %s", tupdate.TaskId.Hex())
log.Debugf("QueueTaskUpdate: nothing to do locally for task %s", tupdate.TaskID.Hex())
}
return nil
......@@ -179,11 +175,10 @@ func ValidForCancelRequested(new_status string) bool {
func CreateTaskUpdatePusher(config *Conf, upstream *UpstreamConnection, session *mgo.Session) *TaskUpdatePusher {
return &TaskUpdatePusher{
makeClosable(),
config,
upstream,
session,
make(chan bool),
new(sync.WaitGroup),
}
}
......@@ -191,15 +186,8 @@ func CreateTaskUpdatePusher(config *Conf, upstream *UpstreamConnection, session
* Closes the task update pusher by stopping all timers & goroutines.
*/
func (self *TaskUpdatePusher) Close() {
close(self.done)
// Dirty hack: sleep for a bit to ensure the closing of the 'done'
// channel can be handled by other goroutines, before handling the
// closing of the other channels.
time.Sleep(1)
log.Info("TaskUpdatePusher: shutting down, waiting for shutdown to complete.")
self.done_wg.Wait()
self.closableCloseAndWait()
log.Info("TaskUpdatePusher: shutdown complete.")
}
......@@ -212,12 +200,14 @@ func (self *TaskUpdatePusher) Go() {
db := mongo_sess.DB("")
queue := db.C(QUEUE_MGO_COLLECTION)
self.done_wg.Add(1)
defer self.done_wg.Done()
if !self.closableAdd(1) {
return
}
defer self.closableDone()
// Investigate the queue periodically.
timer_chan := Timer("TaskUpdatePusherTimer",
TASK_QUEUE_INSPECT_PERIOD, false, self.done, self.done_wg)
TASK_QUEUE_INSPECT_PERIOD, false, &self.closable)
for _ = range timer_chan {
// log.Info("TaskUpdatePusher: checking task update queue")
......@@ -333,7 +323,7 @@ func (self *TaskUpdatePusher) handle_incoming_cancel_requests(cancel_task_ids []
queue_task_cancel := func(task_id bson.ObjectId) {
tupdate := TaskUpdate{
TaskId: task_id,
TaskID: task_id,
TaskStatus: "canceled",
}
if err := QueueTaskUpdate(&tupdate, db); err != nil {
......@@ -345,13 +335,13 @@ func (self *TaskUpdatePusher) handle_incoming_cancel_requests(cancel_task_ids []
}
for _, task_to_cancel := range tasks_to_cancel {
seen_tasks[task_to_cancel.Id] = true
seen_tasks[task_to_cancel.ID] = true
if task_to_cancel.Status == "active" {
// This needs to be canceled through the worker, and thus go to cancel-requested.
go_to_cancel_requested = append(go_to_cancel_requested, task_to_cancel.Id)
go_to_cancel_requested = append(go_to_cancel_requested, task_to_cancel.ID)
} else {
queue_task_cancel(task_to_cancel.Id)
queue_task_cancel(task_to_cancel.ID)
}
}
......@@ -389,7 +379,7 @@ func (self *TaskUpdatePusher) handle_incoming_cancel_requests(cancel_task_ids []
func LogTaskActivity(worker *Worker, task_id bson.ObjectId, activity, log_line string, db *mgo.Database) {
tupdate := TaskUpdate{
TaskId: task_id,
TaskID: task_id,
Activity: activity,
Log: log_line,
}
......
......@@ -64,7 +64,7 @@ func (s *TaskUpdatesTestSuite) TestCancelRunningTasks(t *check.C) {
log.Info("POST from manager received on server, sending back TaskUpdateResponse.")
resp := TaskUpdateResponse{
CancelTasksIds: []bson.ObjectId{task2.Id},
CancelTasksIds: []bson.ObjectId{task2.ID},
}
return httpmock.NewJsonResponse(200, &resp)
},
......@@ -90,9 +90,9 @@ func (s *TaskUpdatesTestSuite) TestCancelRunningTasks(t *check.C) {
// Check that one task was canceled and the other was not.
task_db := Task{}
assert.Nil(t, tasks_coll.FindId(task1.Id).One(&task_db))
assert.Nil(t, tasks_coll.FindId(task1.ID).One(&task_db))
assert.Equal(t, "queued", task_db.Status)
assert.Nil(t, tasks_coll.FindId(task2.Id).One(&task_db))
assert.Nil(t, tasks_coll.FindId(task2.ID).One(&task_db))
assert.Equal(t, "canceled", task_db.Status)
}
......@@ -114,33 +114,33 @@ func (s *TaskUpdatesTestSuite) TestMultipleWorkersForOneTask(c *check.C) {
assert.Nil(c, StoreNewWorker(&worker2, s.db))
// Task should not be assigned to any worker
assert.Nil(c, task1.WorkerId)
assert.Nil(c, task1.WorkerID)
tupdate := TaskUpdate{
TaskId: task1.Id,
TaskID: task1.ID,
Activity: "doing stuff by worker1",
}
payload_bytes, err := json.Marshal(tupdate)
assert.Nil(c, err)
resp_rec, ar := WorkerTestRequestWithBody(worker1.Id, bytes.NewBuffer(payload_bytes), "POST", "/tasks/1aaaaaaaaaaaaaaaaaaaaaaa/update")
QueueTaskUpdateFromWorker(resp_rec, ar, s.db, task1.Id)
resp_rec, ar := WorkerTestRequestWithBody(worker1.ID, bytes.NewBuffer(payload_bytes), "POST", "/tasks/1aaaaaaaaaaaaaaaaaaaaaaa/update")
QueueTaskUpdateFromWorker(resp_rec, ar, s.db, task1.ID)
assert.Equal(c, 204, resp_rec.Code)
// Because of this update, the task should be assigned to worker 1
assert.Nil(c, tasks_coll.FindId(task1.Id).One(&task1))
assert.Equal(c, task1.WorkerId, task1.WorkerId)
assert.Nil(c, tasks_coll.FindId(task1.ID).One(&task1))
assert.Equal(c, task1.WorkerID, task1.WorkerID)
assert.Equal(c, task1.Activity, "doing stuff by worker1")
// An update by worker 2 should fail.
tupdate.Activity = "doing stuff by worker2"
payload_bytes, err = json.Marshal(tupdate)
assert.Nil(c, err)
resp_rec, ar = WorkerTestRequestWithBody(worker2.Id, bytes.NewBuffer(payload_bytes), "POST", "/tasks/1aaaaaaaaaaaaaaaaaaaaaaa/update")
QueueTaskUpdateFromWorker(resp_rec, ar, s.db, task1.Id)
resp_rec, ar = WorkerTestRequestWithBody(worker2.ID, bytes.NewBuffer(payload_bytes), "POST", "/tasks/1aaaaaaaaaaaaaaaaaaaaaaa/update")
QueueTaskUpdateFromWorker(resp_rec, ar, s.db, task1.ID)
assert.Equal(c, http.StatusConflict, resp_rec.Code)
// The task should still be assigned to worker 1
assert.Nil(c, tasks_coll.FindId(task1.Id).One(&task1))
assert.Equal(c, task1.WorkerId, task1.WorkerId)
assert.Nil(c, tasks_coll.FindId(task1.ID).One(&task1))
assert.Equal(c, task1.WorkerID, task1.WorkerID)
assert.Equal(c, task1.Activity, "doing stuff by worker1")
}
package flamenco
import (
"sync"
"time"
log "github.com/Sirupsen/logrus"
......@@ -14,13 +13,15 @@ type TimerPing struct{}
*
* :param sleep_first: if true: sleep first, then ping. If false: ping first, then sleep.
*/
func Timer(name string, sleep_duration time.Duration, sleep_first bool,
done_chan <-chan bool, done_wg *sync.WaitGroup) <-chan TimerPing {
func Timer(name string, sleep_duration time.Duration, sleep_first bool, closable *closable) <-chan TimerPing {
timer_chan := make(chan TimerPing, 1) // don't let the timer block
go func() {
done_wg.Add(1)
defer done_wg.Done()
if !closable.closableAdd(1) {
log.Infof("Timer '%s' goroutine shutting down.", name)
return
}
defer closable.closableDone()
defer close(timer_chan)
last_timer := time.Time{}
......@@ -30,7 +31,7 @@ func Timer(name string, sleep_duration time.Duration, sleep_first bool,
for {
select {
case <-done_chan:
case <-closable.doneChan:
log.Infof("Timer '%s' goroutine shutting down.", name)
return
default:
......@@ -55,17 +56,18 @@ func Timer(name string, sleep_duration time.Duration, sleep_first bool,
*
* :returns: "ok", so true when the sleep stopped normally, and false if it was killed.
*/
func KillableSleep(name string, sleep_duration time.Duration,
done_chan <-chan bool, done_wg *sync.WaitGroup) bool {
func KillableSleep(name string, sleep_duration time.Duration, closable *closable) bool {
done_wg.Add(1)
defer done_wg.Done()
if !closable.closableAdd(1) {
return false
}
defer closable.closableDone()
defer log.Infof("Sleep '%s' goroutine is shut down.", name)
sleep_start := time.Now()
for {
select {
case <-done_chan:
case <-closable.doneChan:
log.Infof("Sleep '%s' goroutine shutting down.", name)
return false
default:
......@@ -88,7 +90,7 @@ func UtcNow() *time.Time {
return &now
}
/* Sends a 'true' to the channel after the given timeout.
/* TimeoutAfter: Sends a 'true' to the channel after the given timeout.
* Send a 'false' to the channel yourself if you want to notify the receiver that
* a timeout didn't happen.
*
......
......@@ -9,7 +9,6 @@ import (
"io/ioutil"
"net/http"
"net/url"
"sync"
"time"
log "github.com/Sirupsen/logrus"
......@@ -24,23 +23,20 @@ const STARTUP_NOTIFICATION_INITIAL_DELAY = 500 * time.Millisecond
const STARTUP_NOTIFICATION_RETRY = 30 * time.Second
type UpstreamConnection struct {
closable
config *Conf
session *mgo.Session
// Send any boolean here to kick the task downloader into downloading new tasks.
download_kick chan chan bool
done chan bool
done_wg *sync.WaitGroup
}
func ConnectUpstream(config *Conf, session *mgo.Session) *UpstreamConnection {
upconn := UpstreamConnection{
makeClosable(),
config,
session,
make(chan chan bool),
make(chan bool),
new(sync.WaitGroup),
}
upconn.download_task_loop()
......@@ -51,16 +47,9 @@ func ConnectUpstream(config *Conf, session *mgo.Session) *UpstreamConnection {
* Closes the upstream connection by stopping all upload/download loops.
*/
func (self *UpstreamConnection) Close() {
close(self.done)
// Dirty hack: sleep for a bit to ensure the closing of the 'done'
// channel can be handled by other goroutines, before handling the
// closing of the other channels.
time.Sleep(1)
close(self.download_kick)
log.Debugf("UpstreamConnection: shutting down, waiting for shutdown to complete.")
self.done_wg.Wait()
close(self.download_kick) // TODO: maybe move this between closing of done channel and waiting
self.closableCloseAndWait()
log.Info("UpstreamConnection: shutdown complete.")
}
......@@ -71,15 +60,18 @@ func (self *UpstreamConnection) KickDownloader(synchronous bool) {
log.Info("KickDownloader: Waiting for task downloader to finish.")
// wait for the download to be complete, or the connection to be shut down.
self.done_wg.Add(1)
defer self.done_wg.Done()
if !self.closableAdd(1) {
log.Debugf("KickDownloader: Aborting waiting for task downloader; shutting down.")
return
}
defer self.closableDone()
for {
select {
case <-pingback:
log.Debugf("KickDownloader: done.")
return
case <-self.done:
case <-self.doneChan:
log.Debugf("KickDownloader: Aborting waiting for task downloader; shutting down.")
return
}
......@@ -94,21 +86,22 @@ func (self *UpstreamConnection) download_task_loop() {
timer_chan := Timer("download_task_loop",
self.config.DownloadTaskSleep,
false,
self.done,
self.done_wg,
&self.closable,
)
go func() {
mongo_sess := self.session.Copy()
defer mongo_sess.Close()
self.done_wg.Add(1)
defer self.done_wg.Done()
if !self.closableAdd(1) {
return
}
defer self.closableDone()
defer log.Info("download_task_loop: Task download goroutine shutting down.")
for {
select {
case <-self.done:
case <-self.doneChan:
return
case _, ok := <-timer_chan:
if !ok {
......@@ -205,17 +198,17 @@ func download_tasks_from_upstream(config *Conf, mongo_sess *mgo.Session) {
}
tasks_coll := db.C("flamenco_tasks")
for _, task := range depsgraph {
change, err := tasks_coll.Upsert(bson.M{"_id": task.Id}, task)
change, err := tasks_coll.Upsert(bson.M{"_id": task.ID}, task)
if err != nil {
log.Errorf("unable to insert new task %s: %s", task.Id.Hex(), err)
log.Errorf("unable to insert new task %s: %s", task.ID.Hex(), err)
continue
}
if change.Updated > 0 {
log.Debug("Upstream server re-queued existing task ", task.Id.Hex())
log.Debug("Upstream server re-queued existing task ", task.ID.Hex())
} else if change.Matched > 0 {
log.Debugf("Upstream server re-queued existing task %s, but nothing changed",
task.Id.Hex())
task.ID.Hex())
}
}
......@@ -256,7 +249,7 @@ func (self *UpstreamConnection) SendJson(logprefix, method string, url *url.URL,
func (self *UpstreamConnection) SendStartupNotification() {
notification := StartupNotification{
ManagerUrl: self.config.OwnUrl,
ManagerURL: self.config.OwnUrl,
VariablesByVarname: self.config.VariablesByVarname,
NumberOfWorkers: 0,
}
......@@ -282,20 +275,23 @@ func (self *UpstreamConnection) SendStartupNotification() {
go func() {
// Register as a loop that responds to 'done' being closed.
self.done_wg.Add(1)
defer self.done_wg.Done()
if !self.closableAdd(1) {
log.Warning("SendStartupNotification: shutting down early without sending startup notification.")
return
}
defer self.closableDone()
mongo_sess := self.session.Copy()
defer mongo_sess.Close()
ok := KillableSleep("SendStartupNotification-initial", STARTUP_NOTIFICATION_INITIAL_DELAY,
self.done, self.done_wg)
&self.closable)
if !ok {
log.Warning("SendStartupNotification: shutting down without sending startup notification.")
return
}
timer_chan := Timer("SendStartupNotification", STARTUP_NOTIFICATION_RETRY,
false, self.done, self.done_wg)
false, &self.closable)
for _ = range timer_chan {
log.Info("SendStartupNotification: trying to send notification.")
......@@ -345,7 +341,7 @@ func (self *UpstreamConnection) SendTaskUpdates(updates *[]TaskUpdate) (*TaskUpd
* If it was changed or removed, this function return true.
*/
func (self *UpstreamConnection) RefetchTask(task *Task) bool {
get_url, err := self.ResolveUrl("/api/flamenco/tasks/%s", task.Id.Hex())
get_url, err := self.ResolveUrl("/api/flamenco/tasks/%s", task.ID.Hex())
log.Infof("Verifying task with Flamenco Server %s", get_url)
req, err := http.NewRequest("GET", get_url.String(), nil)
......@@ -365,20 +361,20 @@ func (self *UpstreamConnection) RefetchTask(task *Task) bool {
if resp.StatusCode == http.StatusNotModified {
// Nothing changed, we're good to go.
log.Infof("Cached task %s is still the same on the Server", task.Id.Hex())
log.Infof("Cached task %s is still the same on the Server", task.ID.Hex())
return false
}
if resp.StatusCode >= 500 {
// Internal errors, we'll ignore that.
log.Warningf("Error %d trying to re-fetch task %s",
resp.StatusCode, task.Id.Hex())
resp.StatusCode, task.ID.Hex())
return false
}
if 300 <= resp.StatusCode && resp.StatusCode < 400 {
// Redirects, we'll ignore those too for now.
log.Warningf("Redirect %d trying to re-fetch task %s, not following redirect.",
resp.StatusCode, task.Id.Hex())
resp.StatusCode, task.ID.Hex())
return false
}
......@@ -390,7 +386,7 @@ func (self *UpstreamConnection) RefetchTask(task *Task) bool {
// Not found, access denied, that kind of stuff. Locally cancel the task.
// TODO: probably better to go to "failed".
log.Warningf("Code %d when re-fetching task %s; canceling local copy",
resp.StatusCode, task.Id.Hex())
resp.StatusCode, task.ID.Hex())
new_task = *task
new_task.Status = "canceled"
......@@ -411,9 +407,9 @@ func (self *UpstreamConnection) RefetchTask(task *Task) bool {
// save the task to the queue.
log.Infof("Cached task %s was changed on the Server, status=%s, priority=%d.",
task.Id.Hex(), new_task.Status, new_task.Priority)
task.ID.Hex(), new_task.Status, new_task.Priority)
tasks_coll := self.session.DB("").C("flamenco_tasks")
tasks_coll.UpdateId(task.Id,
tasks_coll.UpdateId(task.ID,
bson.M{"$set": new_task})
return true
......
......@@ -67,7 +67,7 @@ func StoreNewWorker(winfo *Worker, db *mgo.Database) error {
var err error
// Store it in MongoDB after hashing the password and assigning an ID.
winfo.Id = bson.NewObjectId()
winfo.ID = bson.NewObjectId()
winfo.HashedSecret, err = bcrypt.GenerateFromPassword([]byte(winfo.Secret), bcrypt.DefaultCost)
if err != nil {
log.Errorf("Unable to hash password:", err)
......@@ -146,19 +146,19 @@ func WorkerMayRunTask(w http.ResponseWriter, r *auth.AuthenticatedRequest,
task := Task{}
if err := db.C("flamenco_tasks").FindId(task_id).One(&task); err != nil {
log.Warningf("%s WorkerMayRunTask: unable to find task %s for worker %s",
r.RemoteAddr, task_id.Hex(), worker.Id.Hex())
r.RemoteAddr, task_id.Hex(), worker.ID.Hex())
response.Reason = fmt.Sprintf("unable to find task %s", task_id.Hex())
} else if task.WorkerId != nil && *task.WorkerId != worker.Id {
} else if task.WorkerID != nil && *task.WorkerID != worker.ID {
log.Warningf("%s WorkerMayRunTask: task %s was assigned from worker %s to %s",
r.RemoteAddr, task_id.Hex(), worker.Id.Hex(), task.WorkerId.Hex())
r.RemoteAddr, task_id.Hex(), worker.ID.Hex(), task.WorkerID.Hex())
response.Reason = fmt.Sprintf("task %s reassigned to another worker", task_id.Hex())
} else if !IsRunnableTaskStatus(task.Status) {
log.Warningf("%s WorkerMayRunTask: task %s is in not-runnable status %s, worker %s will stop",
r.RemoteAddr, task_id.Hex(), task.Status, worker.Id.Hex())
r.RemoteAddr, task_id.Hex(), task.Status, worker.ID.Hex())
response.Reason = fmt.Sprintf("task %s in non-runnable status %s", task_id.Hex(), task.Status)
} else {
response.MayKeepRunning = true
WorkerPingedTask(&worker.Id, task_id, db)
WorkerPingedTask(&worker.ID, task_id, db)
}
// Send the response
......@@ -214,8 +214,8 @@ func WorkerSeen(worker *Worker, remote_addr string, db *mgo.Database) {
updates["address"] = remote_addr
}
if err := db.C("flamenco_workers").UpdateId(worker.Id, bson.M{"$set": updates}); err != nil {
log.Errorf("WorkerSeen: unable to update worker %s in MongoDB: %s", worker.Id, err)
if err := db.C("flamenco_workers").UpdateId(worker.ID, bson.M{"$set": updates}); err != nil {
log.Errorf("WorkerSeen: unable to update worker %s in MongoDB: %s", worker.ID, err)
}
}
......@@ -237,7 +237,7 @@ func WorkerSignOff(w http.ResponseWriter, r *auth.AuthenticatedRequest, db *mgo.
// Update the tasks assigned to the worker.
var tasks []Task
query := bson.M{
"worker_id": worker.Id,
"worker_id": worker.ID,
"status": "active",
}
sent_header := false
......@@ -256,15 +256,15 @@ func WorkerSignOff(w http.ResponseWriter, r *auth.AuthenticatedRequest, db *mgo.
}
for _, task := range tasks {
tupdate.TaskId = task.Id
tupdate.TaskID = task.ID
if err := QueueTaskUpdate(&tupdate, db); err != nil {
if !sent_header {
w.WriteHeader(http.StatusInternalServerError)
sent_header = true
}
fmt.Fprintf(w, "Error updating task %s: %s\n", task.Id.Hex(), err)
fmt.Fprintf(w, "Error updating task %s: %s\n", task.ID.Hex(), err)
log.Errorf("WorkerSignOff: unable to update task %s for worker %s in MongoDB: %s",
task.Id.Hex(), w_ident, err)
task.ID.Hex(), w_ident, err)
}
}
}
......@@ -274,7 +274,7 @@ func WorkerSignOff(w http.ResponseWriter, r *auth.AuthenticatedRequest, db *mgo.
updates := bson.M{
"status": worker.Status,
}
if err := db.C("flamenco_workers").UpdateId(worker.Id, bson.M{"$set": updates}); err != nil {
if err := db.C("flamenco_workers").UpdateId(worker.ID, bson.M{"$set": updates}); err != nil {
if !sent_header {
w.WriteHeader(http.StatusInternalServerError)
}
......
......@@ -46,12 +46,12 @@ func (s *SchedulerTestSuite) TestWorkerMayRun(t *check.C) {
}
// Make sure the scheduler gives us this task.
resp_rec, ar := WorkerTestRequest(s.worker_lnx.Id, "GET", "/task")
resp_rec, ar := WorkerTestRequest(s.worker_lnx.ID, "GET", "/task")
s.sched.ScheduleTask(resp_rec, ar)
// Right after obtaining the task, we should be allowed to keep running it.
resp_rec, ar = WorkerTestRequest(s.worker_lnx.Id, "GET", "/may-i-run/%s", task.Id.Hex())
WorkerMayRunTask(resp_rec, ar, s.db, task.Id)
resp_rec, ar = WorkerTestRequest(s.worker_lnx.ID, "GET", "/may-i-run/%s", task.ID.Hex())
WorkerMayRunTask(resp_rec, ar, s.db, task.ID)
resp := MayKeepRunningResponse{}
parseJson(t, resp_rec, 200, &resp)
......@@ -59,22 +59,22 @@ func (s *SchedulerTestSuite) TestWorkerMayRun(t *check.C) {
assert.Equal(t, true, resp.MayKeepRunning)
// If we now change the task status to "cancel-requested", the worker should be denied.
assert.Nil(t, s.db.C("flamenco_tasks").UpdateId(task.Id,
assert.Nil(t, s.db.C("flamenco_tasks").UpdateId(task.ID,
bson.M{"$set": bson.M{"status": "cancel-requested"}}))
resp_rec, ar = WorkerTestRequest(s.worker_lnx.Id, "GET", "/may-i-run/%s", task.Id.Hex())
WorkerMayRunTask(resp_rec, ar, s.db, task.Id)
resp_rec, ar = WorkerTestRequest(s.worker_lnx.ID, "GET", "/may-i-run/%s", task.ID.Hex())
WorkerMayRunTask(resp_rec, ar, s.db, task.ID)
resp = MayKeepRunningResponse{}
parseJson(t, resp_rec, 200, &resp)
assert.Equal(t, false, resp.MayKeepRunning)
// Changing status back to "active", but assigning to another worker
assert.Nil(t, s.db.C("flamenco_tasks").UpdateId(task.Id, bson.M{"$set": bson.M{
assert.Nil(t, s.db.C("flamenco_tasks").UpdateId(task.ID, bson.M{"$set": bson.M{
"status": "active",
"worker_id": s.worker_win.Id,
"worker_id": s.worker_win.ID,
}}))
resp_rec, ar = WorkerTestRequest(s.worker_lnx.Id, "GET", "/may-i-run/%s", task.Id.Hex())
WorkerMayRunTask(resp_rec, ar, s.db, task.Id)
resp_rec, ar = WorkerTestRequest(s.worker_lnx.ID, "GET", "/may-i-run/%s", task.ID.Hex())
WorkerMayRunTask(resp_rec, ar, s.db, task.ID)
resp = MayKeepRunningResponse{}
parseJson(t, resp_rec, 200, &resp)
......
package main
import (
"context"
"flag"
"fmt"
"net/http"
......@@ -30,6 +31,8 @@ var upstream *flamenco.UpstreamConnection
var task_scheduler *flamenco.TaskScheduler
var task_update_pusher *flamenco.TaskUpdatePusher
var task_timeout_checker *flamenco.TaskTimeoutChecker
var httpServer *http.Server
var shutdownComplete chan struct{}
func http_status(w http.ResponseWriter, r *http.Request) {
flamenco.SendStatusReport(w, r, session, FLAMENCO_VERSION)
......@@ -113,20 +116,30 @@ func shutdown(signum os.Signal) {
go func() {
log.Infof("Signal '%s' received, shutting down.", signum)
if httpServer != nil {
log.Info("Shutting down HTTP server")
httpServer.Shutdown(context.Background())
} else {
log.Warning("HTTP server was not even started yet")
}
task_timeout_checker.Close()
task_update_pusher.Close()
upstream.Close()
session.Close()
log.Warning("Shutdown complete, stopping process.")
timeout <- false
}()
if <-timeout {
log.Warning("Shutdown forced, stopping process.")
}
log.Error("Shutdown forced, stopping process.")
os.Exit(-2)
}
log.Warning("Shutdown complete, stopping process.")
close(shutdownComplete)
}
var cliArgs struct {
verbose bool
debug bool
......@@ -223,6 +236,12 @@ func main() {
go task_update_pusher.Go()
go task_timeout_checker.Go()
// Create the HTTP server before allowing the shutdown signal Handler
// to exist. This prevents a race condition when Ctrl+C is pressed after
// the http.Server is created, but before it is assigned to httpServer.
httpServer = &http.Server{Addr: config.Listen, Handler: router}
shutdownComplete = make(chan struct{})
// Handle Ctrl+C
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
......@@ -236,12 +255,10 @@ func main() {
// Fall back to insecure server if TLS certificate/key is not defined.
if !has_tls {
log.Fatal(http.ListenAndServe(config.Listen, router))
log.Warning(httpServer.ListenAndServe())
} else {
log.Fatal(http.ListenAndServeTLS(
config.Listen,
config.TLSCert,
config.TLSKey,
router))
log.Warning(httpServer.ListenAndServeTLS(config.TLSCert, config.TLSKey))
}
<-shutdownComplete
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment