Skip to content
Snippets Groups Projects
Commit 4d9819c9 authored by Sybren A. Stüvel's avatar Sybren A. Stüvel
Browse files

Removed Flamenco Server and Manager, and moved Worker to top level

This is a clone of the Flamenco repository from back in the days when
Server, Manager and Worker shared the same Git repository. This is the
commit where that ended, and they went their separate ways.
parent 51bbd2d7
No related branches found
No related tags found
No related merge requests found
Showing
with 0 additions and 639 deletions
# Flamenco Manager
This is the Flamenco Manager implementation in Go.
## Running as service via systemd
1. Build and configure Flamenco Manager (see below).
2. Edit `flamenco-manager.service` to update it for the installation location, then place the file
in `/etc/systemd/system`.
3. Run `systemctl daemon-reload` to pick up on the new/edited file.
4. Run `systemctl start flamenco-manager` to start Flamenco Manager.
5. Run `systemctl enable flamenco-manager` to ensure it starts at boot too.
## CLI arguments
Flamenco Manager accepts the following CLI arguments:
- `-debug`: Enable debug-level logging
- `-verbose`: Enable info-level logging (no-op if `-debug` is also given)
- `-json`: Log in JSON format, instead of plain text
- `cleanslate`: Start with a clean slate; erases all tasks from the local MongoDB,
then exits Flamenco Manager. This can be run while another Flamenco Manager is
running, but this scenario has not been well-tested yet.
## Starting development
`$FM` denotes the directory containing a checkout of Flamenco Manager, that is, the absolute path
of this `flamenco-manager-go` directory.
0. Make sure you have MongoDB up and running (on localhost)
1. Install Go 1.8 or newer
2. `export GOPATH=$FM`
3. `cd $FM/src/flamenco-manager`
4. Download all dependencies with `go get`
5. Download Flamenco test dependencies with `go get -t ./...`
6. Run the unittests with `go test ./...`
7. Build your first Flamenco Manager with `go build`; this will create an executable
`flamenco-manager` in `$FM/src/flamenco-manager` as well as an executable in the current folder
8. Copy `flamenco-manager-example.yaml` and name it `flamenco-manager.yaml` and then update
it with the info generated after creating a manager document on the Server
### Testing
To run all unit tests, run `go test ./flamenco -v`. To run a specific GoCheck test, run
`go test ./flamenco -v --run TestWithGocheck -check.f SchedulerTestSuite.TestVariableReplacement`
where the argument to `--run` determines which suite to run, and `-check.f` determines the
exact test function of that suite. Once all tests have been moved over to use GoCheck, the
`--run` parameter will probably not be needed any more.
## Communication between Server and Manager
Flamenco Manager is responsible for initiating all communication between Server and Manager,
since Manager should be able to run behind some firewall/router, without being reachable by Server.
In the text below, `some_fields` refer to configuration file settings.
### Fetching tasks
1. When a Worker ask for a task, it is served a task in state `queued` or `claimed-by-manager` in
the local task queue (MongoDB collection "flamenco_tasks"). In this case, Manager performs a
conditional GET (based on etag) to Server at /api/flamenco/tasks/{task-id} to see if the task
has been updated since queued. If this is so, the task is updated in the queue and the queue
is re-examined.
2. When the queue is empty, the manager fetches N new tasks from the Server, where N is the number
of registered workers.
### Task updates and canceling running tasks
0. Pushes happen as POST to "/api/flamenco/managers/{manager-id}/task-update-batch"
1. Task updates queued by workers are pushed every `task_update_push_max_interval_seconds`, or
when `task_update_push_max_count` updates are queued, whichever happens sooner.
2. An empty list of task updates is pushed every `cancel_task_fetch_max_interval_seconds`, unless an
actual push (as described above) already happened within that time.
3. The response to a push contains the database IDs of the accepted task updates, as well as
a list of task database IDs of tasks that should be canceled. If this list is non-empty, the
tasks' statuses are updated accordingly.
## Timeouts of active tasks
When a worker starts working on a task, that task moves to status "active". The worker then
regularly calls `/may-i-run/{task-id}` to verify that it is still allowed to run that task. If this
end-point is not called within `active_task_timeout_interval_seconds` seconds, it will go to status
"failed". The default for this setting is 60 seconds, which is likely to be too short, so please
configure it for your environment.
This timeout check will start running 5 minutes after the Manager has started up. This allows
workers to let it know they are still alive, in case the manager was unreachable for longer than
the timeout period. For now this startup delay is hard-coded.
## Known issues & limitations
1. The downloading of tasks doesn't consider job types. This means that workers can be starved
waiting for tasks, when there are 1000nds of tasks and workers of type X and only a relatively
low number of workers and tasks of type Y.
## MISSING FEATURES / TO DO
In no particular order:
- Task queue cleanup. At the moment tasks are stored in the queue forever, since that makes
it possible to notice a task was canceled while a worker was running it. Eventually such
tasks should be cleaned up, though.
- GZip compression on the pushes to Server. This is especially important for task updates, since
they contain potentially very large log entries.
- A way for Flamenco Server to get an overview of Workers, and set their status.
- the Task struct in `documents.go` should be synced with the Eve schema.
FROM scratch
MAINTAINER Dr. Sybren <sybren@blender.studio>
EXPOSE 80
ADD flamenco-manager /
ADD flamenco-manager.yaml /
CMD ["/flamenco-manager"]
#!/bin/bash -e
GID=$(id --group)
# Use Docker to get Go in a way that allows overwriting the
# standard library with statically linked versions.
docker run -i --rm \
-v $(pwd):/docker \
-v "${GOPATH}:/go-local" \
--env GOPATH=/go-local \
golang /bin/bash -e << EOT
go version
cd \${GOPATH}/src/flamenco-manager
CGO_ENABLED=0 go get -a -ldflags '-s'
cp \${GOPATH}/bin/flamenco-manager /docker
chown $UID:$GID /docker/flamenco-manager
EOT
database_url: mongodb://localhost/flamanager
listen: '[::0]:8083'
own_url: http://192.168.3.108:8083/
flamenco: https://cloud.blender.org/
manager_id: 58514ce59837734ea16e9060
manager_secret: theworldistheworld
tlskey: tls/privkey.pem
tlscert: tls/cert.pem
# How often the Manager should ask the Server for new tasks when the local queue is empty.
download_task_sleep_seconds: 60
# The number of seconds between rechecks when there are no more tasks for workers.
# If set to 0, will not throttle at all.
# If set to -1, will never check when a worker asks for a task (so only every
# download_task_sleep_seconds seconds).
download_task_recheck_throttle_seconds: 10
# These settings determine the task update batching behaviour. All task updates are batched,
# and the batch is sent whenever the max seconds since the last push have passed, or when
# the batch is at its maximum size, whichever happens first.
task_update_push_max_interval_seconds: 5
task_update_push_max_count: 20
# An empty list of task updates is pushed every `cancel_task_fetch_max_interval_seconds`,
# unless an actual task update push already happened within that time. This controls how
# fast the Manager can respond to task cancel requests.
cancel_task_fetch_max_interval_seconds: 10
# When a task has status "active", but it hasn't been touched by a worker in
# this many seconds, it will go to state "failed". Being "touched" means that
# a worker called /may-i-run/{task-id} for this task.
active_task_timeout_interval_seconds: 60
variables:
blender:
windows: c:/temp/blender.exe
linux: /opt/myblenderbuild/blender
darwin: /opt/myblenderbuild/blender
[Unit]
Description=Flamenco Manager
Documentation=https://flamenco.io/
After=mongodb.service
[Service]
Type=simple
ExecStart=/home/flamanager/flamenco-manager
WorkingDirectory=/home/flamanager
User=flamanager
Group=flamanager
Restart=on-failure
RestartSec=1s
EnvironmentFile=-/etc/default/locale
[Install]
WantedBy=multi-user.target
package flamenco
import (
"sync"
log "github.com/Sirupsen/logrus"
)
// closable offers a way to cleanly shut down a running goroutine.
type closable struct {
doneChan chan struct{}
doneWg *sync.WaitGroup
}
// makeClosable constructs a new closable struct
func makeClosable() closable {
return closable{make(chan struct{}), new(sync.WaitGroup)}
}
// closableAdd(delta) should be combined with 'delta' calls to closableDone()
func (closable *closable) closableAdd(delta int) bool {
log.Debugf("Closable: doneWg.Add(%d) ok", delta)
closable.doneWg.Add(delta)
return true
}
// closableDone marks one "thing" as "done"
func (closable *closable) closableDone() {
log.Debugf("Closable: doneWg.Done() ok")
closable.doneWg.Done()
}
// closableCloseAndWait marks the goroutine as "done",
// and waits for all things added with closableAdd() to be "done" too.
func (closable *closable) closableCloseAndWait() {
close(closable.doneChan)
log.Debugf("Closable: waiting for shutdown to finish.")
closable.doneWg.Wait()
}
package flamenco
import (
"bufio"
"fmt"
"os"
log "github.com/Sirupsen/logrus"
mgo "gopkg.in/mgo.v2"
"gopkg.in/mgo.v2/bson"
)
type countresult struct {
Count int `bson:"count"`
}
// M is a shortcut for bson.M to make longer queries easier to read.
type M bson.M
// MongoSession returns a MongoDB session.
//
// The database name should be configured in the database URL.
// You can use this default database using session.DB("").
func MongoSession(config *Conf) *mgo.Session {
var err error
var session *mgo.Session
log.Infof("Connecting to MongoDB at %s", config.DatabaseUrl)
if session, err = mgo.Dial(config.DatabaseUrl); err != nil {
panic(err)
}
session.SetMode(mgo.Monotonic, true)
ensureIndices(session)
return session
}
func ensureIndices(session *mgo.Session) {
db := session.DB("")
index := mgo.Index{
Key: []string{"status", "priority"},
Unique: false,
DropDups: false,
Background: false,
Sparse: false,
}
if err := db.C("flamenco_tasks").EnsureIndex(index); err != nil {
panic(err)
}
index = mgo.Index{
Key: []string{"task_id", "received_on_manager"},
Unique: false,
DropDups: false,
Background: false,
Sparse: false,
}
if err := db.C("task_update_queue").EnsureIndex(index); err != nil {
panic(err)
}
}
// Count returns the number of documents in the given collection.
func Count(coll *mgo.Collection) (int, error) {
aggrOps := []bson.M{
bson.M{
"$group": bson.M{
"_id": nil,
"count": bson.M{"$sum": 1},
},
},
}
pipe := coll.Pipe(aggrOps)
result := countresult{}
if err := pipe.One(&result); err != nil {
if err == mgo.ErrNotFound {
// An empty collection is not an error.
return 0, nil
}
return -1, err
}
return result.Count, nil
}
// GetSettings returns the settings as saved in our MongoDB.
func GetSettings(db *mgo.Database) *SettingsInMongo {
settings := &SettingsInMongo{}
err := db.C("settings").Find(bson.M{}).One(settings)
if err != nil && err != mgo.ErrNotFound {
log.Panic("db.GetSettings: Unable to get settings: ", err)
}
return settings
}
// SaveSettings stores the given settings in MongoDB.
func SaveSettings(db *mgo.Database, settings *SettingsInMongo) {
_, err := db.C("settings").Upsert(bson.M{}, settings)
if err != nil && err != mgo.ErrNotFound {
log.Panic("db.SaveSettings: Unable to save settings: ", err)
}
}
// CleanSlate erases all tasks in the flamenco_tasks collection.
func CleanSlate(db *mgo.Database) {
fmt.Println("")
fmt.Println("Performing Clean Slate operation, this will erase all tasks from the local DB.")
fmt.Println("After performing the Clean Slate, Flamenco-Manager will shut down.")
fmt.Println("Press [ENTER] to continue, [Ctrl+C] to abort.")
bufio.NewReader(os.Stdin).ReadLine()
info, err := db.C("flamenco_tasks").RemoveAll(bson.M{})
if err != nil {
log.WithError(err).Panic("unable to erase all tasks")
}
log.Warningf("Erased %d tasks", info.Removed)
settings := GetSettings(db)
settings.DepsgraphLastModified = nil
SaveSettings(db, settings)
}
package flamenco
import (
"time"
"gopkg.in/mgo.v2/bson"
)
// Command is an executable part of a Task
type Command struct {
Name string `bson:"name" json:"name"`
Settings bson.M `bson:"settings" json:"settings"`
}
// Task contains a Flamenco task, with some BSON-only fields for local Manager use.
type Task struct {
ID bson.ObjectId `bson:"_id,omitempty" json:"_id,omitempty"`
Etag string `bson:"_etag,omitempty" json:"_etag,omitempty"`
Job bson.ObjectId `bson:"job,omitempty" json:"job"`
Manager bson.ObjectId `bson:"manager,omitempty" json:"manager"`
Project bson.ObjectId `bson:"project,omitempty" json:"project"`
User bson.ObjectId `bson:"user,omitempty" json:"user"`
Name string `bson:"name" json:"name"`
Status string `bson:"status" json:"status"`
Priority int `bson:"priority" json:"priority"`
JobPriority int `bson:"job_priority" json:"job_priority"`
JobType string `bson:"job_type" json:"job_type"`
Commands []Command `bson:"commands" json:"commands"`
Log string `bson:"log,omitempty" json:"log,omitempty"`
Activity string `bson:"activity,omitempty" json:"activity,omitempty"`
Parents []bson.ObjectId `bson:"parents,omitempty" json:"parents,omitempty"`
Worker string `bson:"worker,omitempty" json:"worker,omitempty"`
// Internal bookkeeping
WorkerID *bson.ObjectId `bson:"worker_id,omitempty" json:"-"`
LastWorkerPing *time.Time `bson:"last_worker_ping,omitempty" json:"-"`
}
type aggregationPipelineResult struct {
// For internal MongoDB querying only
Task *Task `bson:"task"`
}
// ScheduledTasks contains a dependency graph response from Server.
type ScheduledTasks struct {
Depsgraph []Task `json:"depsgraph"`
}
// TaskUpdate is both sent from Worker to Manager, as well as from Manager to Server.
type TaskUpdate struct {
ID bson.ObjectId `bson:"_id" json:"_id"`
TaskID bson.ObjectId `bson:"task_id" json:"task_id,omitempty"`
TaskStatus string `bson:"task_status,omitempty" json:"task_status,omitempty"`
ReceivedOnManager time.Time `bson:"received_on_manager" json:"received_on_manager"`
Activity string `bson:"activity,omitempty" json:"activity,omitempty"`
TaskProgressPercentage int `bson:"task_progress_percentage" json:"task_progress_percentage"`
CurrentCommandIdx int `bson:"current_command_idx" json:"current_command_idx"`
CommandProgressPercentage int `bson:"command_progress_percentage" json:"command_progress_percentage"`
Log string `bson:"log,omitempty" json:"log,omitempty"`
Worker string `bson:"worker" json:"worker"`
}
// TaskUpdateResponse is received from Server.
type TaskUpdateResponse struct {
ModifiedCount int `json:"modified_count"`
HandledUpdateIds []bson.ObjectId `json:"handled_update_ids,omitempty"`
CancelTasksIds []bson.ObjectId `json:"cancel_task_ids,omitempty"`
}
// WorkerRegistration is sent by the Worker to register itself at this Manager.
type WorkerRegistration struct {
Secret string `json:"secret"`
Platform string `json:"platform"`
SupportedJobTypes []string `json:"supported_job_types"`
Nickname string `json:"nickname"`
}
// Worker contains all information about a specific Worker.
// Some fields come from the WorkerRegistration, whereas others are filled by us.
type Worker struct {
ID bson.ObjectId `bson:"_id,omitempty" json:"_id,omitempty"`
Secret string `bson:"-" json:"-"`
HashedSecret []byte `bson:"hashed_secret" json:"-"`
Nickname string `bson:"nickname" json:"nickname"`
Address string `bson:"address" json:"address"`
Status string `bson:"status" json:"status"`
Platform string `bson:"platform" json:"platform"`
CurrentTask bson.ObjectId `bson:"current_task,omitempty" json:"current_task,omitempty"`
TimeCost int `bson:"time_cost" json:"time_cost"`
LastActivity *time.Time `bson:"last_activity,omitempty" json:"last_activity,omitempty"`
SupportedJobTypes []string `bson:"supported_job_types" json:"supported_job_types"`
}
// StartupNotification sent to upstream Flamenco Server upon startup. This is a combination
// of settings (see settings.go) and information from the database.
type StartupNotification struct {
// Settings
ManagerURL string `json:"manager_url"`
VariablesByVarname map[string]map[string]string `json:"variables"`
// From our local database
NumberOfWorkers int `json:"nr_of_workers"`
}
// MayKeepRunningResponse is sent to workers to indicate whether they can keep running their task.
type MayKeepRunningResponse struct {
MayKeepRunning bool `json:"may_keep_running"`
Reason string `json:"reason,omitempty"`
}
// SettingsInMongo contains settings we want to be able to update from
// within Flamenco Manager itself, so those are stored in MongoDB.
type SettingsInMongo struct {
DepsgraphLastModified *string `bson:"depsgraph_last_modified"`
}
// StatusReport is sent in response to a query on the / URL.
type StatusReport struct {
NrOfWorkers int `json:"nr_of_workers"`
NrOfTasks int `json:"nr_of_tasks"`
Version string `json:"version"`
}
# This file is loaded by unit tests in the 'flamenco' package.
database_url: mongodb://localhost/flamanager_test
listen: '[::0]:8083'
own_url: http://192.168.3.108:8083/
flamenco: http://localhost:51234/
manager_id: 5852bc5198377351f95d103e
manager_secret: SRVwA7wAxPRfudvqTDOLXwPn1cDRIlADz5Ef9kHk7d52Us
download_task_sleep_seconds: 300
download_task_recheck_throttle_seconds: -1
cancel_task_fetch_max_interval_seconds: 10
variables:
blender:
windows: c:/temp/blender.exe
linux: /opt/myblenderbuild/blender
darwin: /opt/myblenderbuild/blender
/**
* Common test functionality, and integration with GoCheck.
*/
package flamenco
import (
"testing"
check "gopkg.in/check.v1"
"gopkg.in/mgo.v2/bson"
)
// Hook up gocheck into the "go test" runner.
// You only need one of these per package, or tests will run multiple times.
func TestWithGocheck(t *testing.T) { check.TestingT(t) }
func ConstructTestTask(task_id, job_type string) Task {
return ConstructTestTaskWithPrio(task_id, job_type, 50)
}
func ConstructTestTaskWithPrio(task_id, job_type string, priority int) Task {
return Task{
ID: bson.ObjectIdHex(task_id),
Etag: "1234567",
Job: bson.ObjectIdHex("bbbbbbbbbbbbbbbbbbbbbbbb"),
Manager: bson.ObjectIdHex("cccccccccccccccccccccccc"),
Project: bson.ObjectIdHex("dddddddddddddddddddddddd"),
User: bson.ObjectIdHex("eeeeeeeeeeeeeeeeeeeeeeee"),
Name: "Test task",
Status: "queued",
Priority: priority,
JobType: job_type,
Commands: []Command{
Command{"echo", bson.M{"message": "Running Blender from {blender}"}},
Command{"sleep", bson.M{"time_in_seconds": 3}},
},
Parents: []bson.ObjectId{
bson.ObjectIdHex("ffffffffffffffffffffffff"),
},
Worker: "worker1",
}
}
package flamenco
import (
"bytes"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
log "github.com/Sirupsen/logrus"
)
// For timestamp parsing
const IsoFormat = "2006-01-02T15:04:05-0700"
/**
* Decodes JSON and writes a Bad Request status if it fails.
*/
func DecodeJson(w http.ResponseWriter, r io.Reader, document interface{},
logprefix string) error {
dec := json.NewDecoder(r)
if err := dec.Decode(document); err != nil {
log.Warningf("%s Unable to decode JSON: %s", logprefix, err)
w.WriteHeader(http.StatusBadRequest)
fmt.Fprintf(w, "Unable to decode JSON: %s\n", err)
return err
}
return nil
}
/**
* Sends a JSON document to some URL via HTTP.
* :param tweakrequest: can be used to tweak the request before sending it, for
* example by adding authentication headers. May be nil.
* :param responsehandler: is called when a non-error response has been read.
* May be nil.
*/
func SendJson(logprefix, method string, url *url.URL,
payload interface{},
tweakrequest func(req *http.Request),
responsehandler func(resp *http.Response, body []byte) error,
) error {
payload_bytes, err := json.Marshal(payload)
if err != nil {
log.Errorf("%s: Unable to marshal JSON: %s", logprefix, err)
return err
}
// TODO Sybren: enable GZip compression.
req, err := http.NewRequest("POST", url.String(), bytes.NewBuffer(payload_bytes))
if err != nil {
log.Errorf("%s: Unable to create request: %s", logprefix, err)
return err
}
req.Header.Add("Content-Type", "application/json")
if tweakrequest != nil {
tweakrequest(req)
}
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
log.Warningf("%s: Unable to POST to %s: %s", logprefix, url, err)
return err
}
body, err := ioutil.ReadAll(resp.Body)
defer resp.Body.Close()
if err != nil {
log.Warningf("%s: Error %d POSTing to %s: %s",
logprefix, resp.StatusCode, url, err)
return err
}
if resp.StatusCode >= 300 {
suffix := ""
if resp.StatusCode != 404 {
suffix = fmt.Sprintf("\n body:\n%s", body)
}
log.Warningf("%s: Error %d POSTing to %s%s",
logprefix, resp.StatusCode, url, suffix)
return fmt.Errorf("%s: Error %d POSTing to %s", logprefix, resp.StatusCode, url)
}
if responsehandler != nil {
return responsehandler(resp, body)
}
return nil
}
package flamenco
func MaxInt(a, b int) int {
if a < b {
return b
}
return a
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment