Skip to content
Snippets Groups Projects
upstream.go 11.6 KiB
Newer Older
/**
 * Periodically fetches new tasks from the Flamenco Server, and sends updates back.
 */
package flamenco

import (
	"encoding/json"
	"fmt"
	"io/ioutil"
	"net/http"
	"net/url"
	"sync"
	"time"

	log "github.com/Sirupsen/logrus"
	mgo "gopkg.in/mgo.v2"
	"gopkg.in/mgo.v2/bson"
)

// Max. nr of tasks that's allowed to be buffered in the channel.
const MAX_OUTSTANDING_TASKS = 5

// Gives the system some time to start up (and open listening HTTP port)
const STARTUP_NOTIFICATION_INITIAL_DELAY = 500 * time.Millisecond

// Duration between consecutive retries of sending the startup notification.
const STARTUP_NOTIFICATION_RETRY = 30 * time.Second

type UpstreamConnection struct {
	config  *Conf
	session *mgo.Session

	// Send any boolean here to kick the task downloader into downloading new tasks.
	download_kick chan chan bool

	done    chan bool
	done_wg *sync.WaitGroup
}

func ConnectUpstream(config *Conf, session *mgo.Session) *UpstreamConnection {
	upconn := UpstreamConnection{
		config,
		session,
		make(chan chan bool),
		make(chan bool),
		new(sync.WaitGroup),
	}
	upconn.download_task_loop()

	return &upconn
}

/**
 * Closes the upstream connection by stopping all upload/download loops.
 */
func (self *UpstreamConnection) Close() {
	close(self.done)

	// Dirty hack: sleep for a bit to ensure the closing of the 'done'
	// channel can be handled by other goroutines, before handling the
	// closing of the other channels.
	time.Sleep(1)
	close(self.download_kick)

	log.Debugf("UpstreamConnection: shutting down, waiting for shutdown to complete.")
	log.Info("UpstreamConnection: shutdown complete.")
}

func (self *UpstreamConnection) KickDownloader(synchronous bool) {
	if synchronous {
		pingback := make(chan bool)
		self.download_kick <- pingback
		log.Info("KickDownloader: Waiting for task downloader to finish.")

		// wait for the download to be complete, or the connection to be shut down.
		self.done_wg.Add(1)
		defer self.done_wg.Done()

		for {
			select {
				log.Debugf("KickDownloader: done.")
				log.Debugf("KickDownloader: Aborting waiting for task downloader; shutting down.")
		log.Debugf("KickDownloader: asynchronous kick, just kicking.")
		self.download_kick <- nil
	}
}

func (self *UpstreamConnection) download_task_loop() {
	timer_chan := Timer("download_task_loop",
		self.config.DownloadTaskSleep,

	go func() {
		mongo_sess := self.session.Copy()
		defer mongo_sess.Close()

		self.done_wg.Add(1)
		defer self.done_wg.Done()
		defer log.Info("download_task_loop: Task download goroutine shutting down.")
			case _, ok := <-timer_chan:
				if !ok {
					return
				}
				log.Info("download_task_loop: Going to fetch tasks due to periodic timeout.")
				download_tasks_from_upstream(self.config, mongo_sess)
			case pingback_chan, ok := <-self.download_kick:
				if !ok {
					return
				}
				log.Info("download_task_loop: Going to fetch tasks due to kick.")
				download_tasks_from_upstream(self.config, mongo_sess)
				if pingback_chan != nil {
					pingback_chan <- true
				}
			}
		}
	}()
}

/**
 * Downloads a chunkn of tasks from the upstream Flamenco Server.
 */
func download_tasks_from_upstream(config *Conf, mongo_sess *mgo.Session) {
	// Try to get as many tasks as we have workers.
	db := mongo_sess.DB("")
	worker_count := WorkerCount(db)

	url_str := fmt.Sprintf("/flamenco/scheduler/tasks/%s?chunk_size=%d",
		config.ManagerId, MaxInt(worker_count, 1))
	rel_url, err := url.Parse(url_str)
	if err != nil {
		log.Warningf("Error parsing '%s' as URL; unable to fetch tasks.", url_str)
		return
	}

	get_url := config.Flamenco.ResolveReference(rel_url)
	log.Infof("Getting tasks from upstream Flamenco %s", get_url)

	req, err := http.NewRequest("GET", get_url.String(), nil)
	if err != nil {
		log.Warningf("Unable to create GET request: %s", err)
		return
	}
	req.SetBasicAuth(config.ManagerSecret, "")

	client := &http.Client{}
	resp, err := client.Do(req)
	if err != nil {
		log.Warningf("Unable to GET %s: %s", get_url, err)
		return
	}

	if resp.StatusCode >= 300 {
		body, err := ioutil.ReadAll(resp.Body)
		if err != nil {
			log.Warningf("Error %d GETing %s: %s", resp.StatusCode, get_url, err)
		log.Warningf("Error %d GETing %s: %s", resp.StatusCode, get_url, body)
		log.Info("No tasks for us; sleeping.")
		return
	}

	// Parse the received tasks.
	var scheduled_tasks []Task
	decoder := json.NewDecoder(resp.Body)
	defer resp.Body.Close()

	if err = decoder.Decode(&scheduled_tasks); err != nil {
		log.Warning("Unable to decode scheduled tasks JSON:", err)
		return
	}

	// Insert them into the MongoDB
	// TODO Sybren: before inserting, compare to the database and deal with any changed statuses.
	if len(scheduled_tasks) > 0 {
		log.Infof("Received %d tasks from upstream Flamenco Server.", len(scheduled_tasks))
	} else {
		log.Debugf("Received %d tasks from upstream Flamenco Server.", len(scheduled_tasks))
	}
	tasks_coll := db.C("flamenco_tasks")
	for _, task := range scheduled_tasks {
		change, err := tasks_coll.Upsert(bson.M{"_id": task.Id}, task)
		if err != nil {
			log.Errorf("unable to insert new task %s: %s", task.Id.Hex(), err)
			log.Infof("Upstream server re-queued existing task %s", task.Id.Hex())
		} else if change.Matched > 0 {
			log.Infof("Upstream server re-queued existing task %s, but nothing changed",
func (self *UpstreamConnection) ResolveUrl(relative_url string, a ...interface{}) (*url.URL, error) {
	rel_url, err := url.Parse(fmt.Sprintf(relative_url, a...))
	if err != nil {
		return &url.URL{}, err
	}
	url := self.config.Flamenco.ResolveReference(rel_url)

	return url, nil
}

func (self *UpstreamConnection) SendJson(logprefix, method string, url *url.URL,
	payload interface{},
	responsehandler func(resp *http.Response, body []byte) error,
) error {
	authenticate := func(req *http.Request) {
		req.SetBasicAuth(self.config.ManagerSecret, "")
	return SendJson(logprefix, method, url, payload, authenticate, responsehandler)
/**
 * Sends a StartupNotification document to upstream Flamenco Server.
 * Keeps trying in a goroutine until the notification was succesful.
 */
func (self *UpstreamConnection) SendStartupNotification() {

	notification := StartupNotification{
		ManagerUrl:         self.config.OwnUrl,
		VariablesByVarname: self.config.VariablesByVarname,
		NumberOfWorkers:    0,
	}

	url, err := self.ResolveUrl("/api/flamenco/managers/%s/startup", self.config.ManagerId)
	if err != nil {
		panic(fmt.Sprintf("SendStartupNotification: unable to construct URL: %s\n", err))
	}

	// Performs the actual sending.
	send_startup_notification := func(mongo_sess *mgo.Session) error {
		notification.NumberOfWorkers = WorkerCount(mongo_sess.DB(""))

		err := self.SendJson("SendStartupNotification", "POST", url, &notification, nil)
			log.Warningf("SendStartupNotification: Unable to send: %s", err)
		log.Infof("SendStartupNotification: Done sending notification to upstream Flamenco")
		return nil
	}

	go func() {
		// Register as a loop that responds to 'done' being closed.
		self.done_wg.Add(1)
		defer self.done_wg.Done()

		mongo_sess := self.session.Copy()
		defer mongo_sess.Close()

		ok := KillableSleep("SendStartupNotification-initial", STARTUP_NOTIFICATION_INITIAL_DELAY,
			self.done, self.done_wg)
		if !ok {
			log.Warning("SendStartupNotification: shutting down without sending startup notification.")
		timer_chan := Timer("SendStartupNotification", STARTUP_NOTIFICATION_RETRY,
			false, self.done, self.done_wg)
			log.Info("SendStartupNotification: trying to send notification.")
			err := send_startup_notification(mongo_sess)
			if err == nil {
				return
			}
		}

		log.Warning("SendStartupNotification: shutting down without succesfully sending notification.")
/**
 * Performs a POST to /api/flamenco/managers/{manager-id}/task-update-batch to
 * send a batch of task updates to the Server.
 */
func (self *UpstreamConnection) SendTaskUpdates(updates *[]TaskUpdate) (*TaskUpdateResponse, error) {
	url, err := self.ResolveUrl("/api/flamenco/managers/%s/task-update-batch",
		self.config.ManagerId)
	if err != nil {
		panic(fmt.Sprintf("SendTaskUpdates: unable to construct URL: %s\n", err))
	}

	response := TaskUpdateResponse{}
	parse_response := func(resp *http.Response, body []byte) error {
		err := json.Unmarshal(body, &response)
		if err != nil {
			log.Warningf("SendTaskUpdates: error parsing server response: %s", err)
	}
	err = self.SendJson("SendTaskUpdates", "POST", url, updates, parse_response)

	return &response, err

/**
 * Re-fetches a task from the Server, but only if its etag changed.
 * - If the etag changed, the differences between the old and new status are handled.
 * - If the Server cannot be reached, this error is ignored and the task is untouched.
 * - If the Server returns an error code other than 500 Internal Server Error,
 *   (Forbidden, Not Found, etc.) the task is removed from the local task queue.
 *
 * If the task was untouched, this function returns false.
 * If it was changed or removed, this function return true.
 */
func (self *UpstreamConnection) RefetchTask(task *Task) bool {
	get_url, err := self.ResolveUrl("/api/flamenco/tasks/%s", task.Id.Hex())
	log.Infof("Verifying task with Flamenco Server %s", get_url)

	req, err := http.NewRequest("GET", get_url.String(), nil)
	if err != nil {
		log.Errorf("WARNING: Unable to create GET request: %s", err)
		return false
	}
	req.SetBasicAuth(self.config.ManagerSecret, "")
	req.Header["If-None-Match"] = []string{task.Etag}

	client := &http.Client{}
	resp, err := client.Do(req)
	if err != nil {
		log.Warningf("Unable to re-fetch task: %s", err)
		return false
	}

	if resp.StatusCode == http.StatusNotModified {
		// Nothing changed, we're good to go.
		log.Infof("Cached task %s is still the same on the Server", task.Id.Hex())
		return false
	}

	if resp.StatusCode >= 500 {
		// Internal errors, we'll ignore that.
		log.Warningf("Error %d trying to re-fetch task %s",
			resp.StatusCode, task.Id.Hex())
		return false
	}
	if 300 <= resp.StatusCode && resp.StatusCode < 400 {
		// Redirects, we'll ignore those too for now.
		log.Warningf("Redirect %d trying to re-fetch task %s, not following redirect.",
			resp.StatusCode, task.Id.Hex())
		return false
	}

	// Either the task is gone (or gone-ish, i.e. 4xx code) or it has changed.
	// If it is gone, we handle it as canceled.
	new_task := Task{}

	if resp.StatusCode >= 400 || resp.StatusCode == 204 {
		// Not found, access denied, that kind of stuff. Locally cancel the task.
		// TODO: probably better to go to "failed".
		log.Warningf("Code %d when re-fetching task %s; canceling local copy",
			resp.StatusCode, task.Id.Hex())

		new_task = *task
		new_task.Status = "canceled"
	} else {
		// Parse the new task we received.
		decoder := json.NewDecoder(resp.Body)
		defer resp.Body.Close()

		if err = decoder.Decode(&new_task); err != nil {
			// We can't decode what's being sent. Remove it from the queue, as we no longer know
			// whether this task is valid at all.
			log.Warningf("Unable to decode updated tasks JSON from %s: %s", get_url, err)

			new_task = *task
			new_task.Status = "canceled"
		}
	}

	// save the task to the queue.
	log.Infof("Cached task %s was changed on the Server, status=%s, priority=%d.",
		task.Id.Hex(), new_task.Status, new_task.Priority)
	tasks_coll := self.session.DB("").C("flamenco_tasks")
	tasks_coll.UpdateId(task.Id,
		bson.M{"$set": new_task})

	return true
}