flamenco

package
v2.4.2+incompatible Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 27, 2019 License: MIT Imports: 40 Imported by: 0

Documentation

Overview

Package flamenco receives task updates from workers, queues them, and forwards them to the Flamenco Server.

Package flamenco periodically fetches new tasks from the Flamenco Server, and sends updates back.

Index

Constants

View Source
const IsoFormat = "2006-01-02T15:04:05-0700"

IsoFormat is used for timestamp parsing

Variables

View Source
var (
	// ErrDuplicateVariables is returned when the same name is used as regular and path-replacement variable.
	ErrDuplicateVariables = errors.New("duplicate variables found")
)

Functions

func CleanSlate

func CleanSlate(db *mgo.Database)

CleanSlate erases all tasks in the flamenco_tasks collection.

func ConvertAndForward

func ConvertAndForward(images <-chan string, storagePath string) <-chan string

ConvertAndForward copies each image it reads from 'images', converts it to a browser- friendly file, and forwards the new filename to the returned channel. It always converts to JPEG, even when the file is a browser-supported format (like PNG), so that the HTML can always refer to /static/latest-image.jpg to show the latest render.

func Count

func Count(coll *mgo.Collection) (int, error)

Count returns the number of documents in the given collection.

func CreateTestTask

func CreateTestTask(worker *Worker, conf *Conf, db *mgo.Database) (string, error)

CreateTestTask constructs a Manager-local test task and queues it for the worker to pick up.

func DecodeJSON

func DecodeJSON(w http.ResponseWriter, r io.Reader, document interface{},
	logprefix string) error

DecodeJSON decodes JSON from an io.Reader, and writes a Bad Request status if it fails.

func Equal

func Equal(a, b []string) bool

Equal tells whether a and b contain the same elements. A nil argument is equivalent to an empty slice.

func GetOrCreateMap

func GetOrCreateMap(document bson.M, key string) bson.M

GetOrCreateMap returns document[key] as bson.M, creating it if necessary.

func ImageWatcherHTTPPush

func ImageWatcherHTTPPush(w http.ResponseWriter, r *http.Request, broadcaster *chantools.OneToManyChan)

ImageWatcherHTTPPush starts a server-side events channel.

func IsRunnableTaskStatus

func IsRunnableTaskStatus(status string) bool

IsRunnableTaskStatus returns whether the given status is considered "runnable".

func MaxInt

func MaxInt(a, b int) int

MaxInt returns the maximum of a and b.

func MongoSession

func MongoSession(config *Conf) *mgo.Session

MongoSession returns a MongoDB session.

The database name should be configured in the database URL. You can use this default database using session.DB("").

func ObjectIDFromRequest

func ObjectIDFromRequest(w http.ResponseWriter, r *http.Request, variableName string) (bson.ObjectId, error)

ObjectIDFromRequest parses the request variable value as Object ID.

func PurgeOutgoingQueue

func PurgeOutgoingQueue(db *mgo.Database)

PurgeOutgoingQueue erases all queued task updates from the local DB

func RegisterWorker

func RegisterWorker(w http.ResponseWriter, r *http.Request, db *mgo.Database)

RegisterWorker creates a new Worker in the DB, based on the WorkerRegistration document received.

func ReplaceLocal

func ReplaceLocal(strvalue string, config *Conf) string

ReplaceLocal performs variable and path replacement for strings based on the local platform.

func ReplaceVariables

func ReplaceVariables(config *Conf, task *Task, worker *Worker)

ReplaceVariables performs variable and path replacement for tasks.

func SaveSettings

func SaveSettings(db *mgo.Database, settings *SettingsInMongo)

SaveSettings stores the given settings in MongoDB.

func SendJSON

func SendJSON(logprefix, method string, url *url.URL,
	payload interface{},
	tweakrequest func(req *http.Request),
	responsehandler func(resp *http.Response, body []byte) error,
) error

SendJSON sends a JSON document to some URL via HTTP. :param tweakrequest: can be used to tweak the request before sending it, for

example by adding authentication headers. May be nil.

:param responsehandler: is called when a non-error response has been read.

May be nil.

func ServeTaskLog

func ServeTaskLog(w http.ResponseWriter, r *http.Request,
	jobID, taskID bson.ObjectId, tuq *TaskUpdateQueue)

ServeTaskLog serves the latest task log file for the given job+task. Depending on the User-Agent header it servers head+tail or the entire file.

func StoreNewWorker

func StoreNewWorker(winfo *Worker, db *mgo.Database) error

StoreNewWorker saves the given worker in the database.

func TemplatePathPrefix

func TemplatePathPrefix(fileToFind string) string

TemplatePathPrefix returns the filename prefix to find template files. Templates are searched for relative to the current working directory as well as relative to the currently running executable.

func Timer

func Timer(name string, sleepDuration, initialDelay time.Duration, closable *closable) <-chan struct{}

Timer is a generic timer for periodic signals.

func UtcNow

func UtcNow() *time.Time

UtcNow returns the current time & date in UTC.

func WorkerAckStatusChange

func WorkerAckStatusChange(w http.ResponseWriter, r *auth.AuthenticatedRequest, db *mgo.Database, ackStatus string)

WorkerAckStatusChange allows a Worker to acknowledge a requested status change.

func WorkerCount

func WorkerCount(db *mgo.Database) int

WorkerCount returns the number of registered workers.

func WorkerGetStatusChange

func WorkerGetStatusChange(w http.ResponseWriter, r *auth.AuthenticatedRequest, db *mgo.Database)

WorkerGetStatusChange allows a Worker to fetch any pending status change.

func WorkerPingedTask

func WorkerPingedTask(workerID bson.ObjectId, taskID bson.ObjectId, taskStatus string, db *mgo.Database)

WorkerPingedTask marks the task as pinged by the worker. If worker_id is not nil, sets the worker_id field of the task. Otherwise doesn't touch that field and only updates last_worker_ping.

func WorkerSecret

func WorkerSecret(user string, db *mgo.Database) string

WorkerSecret returns the hashed secret of the worker.

func WorkerSignOff

func WorkerSignOff(w http.ResponseWriter, r *auth.AuthenticatedRequest, db *mgo.Database,
	scheduler *TaskScheduler)

WorkerSignOff re-queues all active tasks (should be only one) that are assigned to this worker.

func WorkerSignOn

func WorkerSignOn(w http.ResponseWriter, r *auth.AuthenticatedRequest, db *mgo.Database,
	notifier *UpstreamNotifier)

WorkerSignOn allows a Worker to register a new list of supported task types. It also clears the worker's "current" task from the dashboard, so that it's clear that the now-active worker is not actually working on that task.

Types

type BlenderRenderConfig

type BlenderRenderConfig struct {
	JobStorage   string `yaml:"job_storage"`
	RenderOutput string `yaml:"render_output"`
}

BlenderRenderConfig represents the configuration required for a test render.

type Command

type Command struct {
	Name     string `bson:"name" json:"name"`
	Settings bson.M `bson:"settings" json:"settings"`
}

Command is an executable part of a Task

type Conf

type Conf struct {
	Mode          string   `yaml:"mode"` // either "develop" or "production"
	ManagerName   string   `yaml:"manager_name"`
	DatabaseURL   string   `yaml:"database_url"`
	DatabasePath  string   `yaml:"database_path"`
	TaskLogsPath  string   `yaml:"task_logs_path"`
	Listen        string   `yaml:"listen"`
	OwnURL        string   `yaml:"own_url"`
	FlamencoStr   string   `yaml:"flamenco"`
	Flamenco      *url.URL `yaml:"-"`
	ManagerID     string   `yaml:"manager_id"`
	ManagerSecret string   `yaml:"manager_secret"`
	TLSKey        string   `yaml:"tlskey"`
	TLSCert       string   `yaml:"tlscert"`

	DownloadTaskSleep time.Duration `yaml:"download_task_sleep"`

	/* The number of seconds between rechecks when there are no more tasks for workers.
	 * If set to 0, will not throttle at all.
	 * If set to -1, will never check when a worker asks for a task (so only every
	 * download_task_sleep_seconds seconds). */
	DownloadTaskRecheckThrottle time.Duration `yaml:"download_task_recheck_throttle"`

	/* Variables, stored differently in YAML and these settings.
	 * Variables:             variable name -> platform -> value
	 * VariablesPerPlatform:  platform -> variable name -> value
	 */
	VariablesByVarname  map[string]map[string]string `yaml:"variables"`
	VariablesByPlatform map[string]map[string]string `yaml:"-"`

	PathReplacementByVarname  map[string]map[string]string `yaml:"path_replacement"`
	PathReplacementByPlatform map[string]map[string]string `yaml:"-"`

	TaskUpdatePushMaxInterval time.Duration `yaml:"task_update_push_max_interval"`
	TaskUpdatePushMaxCount    int           `yaml:"task_update_push_max_count"`
	CancelTaskFetchInterval   time.Duration `yaml:"cancel_task_fetch_max_interval"`

	ActiveTaskTimeoutInterval   time.Duration `yaml:"active_task_timeout_interval"`
	ActiveWorkerTimeoutInterval time.Duration `yaml:"active_worker_timeout_interval"`

	TaskCleanupMaxAge   time.Duration `yaml:"task_cleanup_max_age"`
	WorkerCleanupMaxAge time.Duration `yaml:"worker_cleanup_max_age"`
	WorkerCleanupStatus []string      `yaml:"worker_cleanup_status"`

	/* This many failures (on a given job+task type combination) will ban a worker
	 * from that task type on that job. */
	BlacklistThreshold int `yaml:"blacklist_threshold"`

	// When this many workers have tried the task and failed, it will be hard-failed
	// (even when there are workers left that could technically retry the task).
	TaskFailAfterSoftFailCount int `yaml:"task_fail_after_softfail_count"`

	WatchForLatestImage string `yaml:"watch_for_latest_image"`

	SSDPDiscovery  bool   `yaml:"ssdp_discovery"`
	SSDPDeviceUUID string `yaml:"ssdp_device_uuid"`

	TestTasks TestTasks `yaml:"test_tasks"`

	// Shaman configuration settings.
	Shaman shamanconfig.Config `yaml:"shaman"`

	// Authentication settings.
	JWT                      jwtauth.Config `yaml:"user_authentication"`
	WorkerRegistrationSecret string         `yaml:"worker_registration_secret"`
}

Conf represents the Manager's configuration file.

func GetConf

func GetConf() (Conf, error)

GetConf parses flamenco-manager.yaml and returns its contents as a Conf object.

func GetTestConfig

func GetTestConfig() Conf

GetTestConfig returns the configuration for unit tests.

func LoadConf

func LoadConf(filename string) (Conf, error)

LoadConf parses the given file and returns its contents as a Conf object.

func (*Conf) HasTLS

func (c *Conf) HasTLS() bool

HasTLS returns true if both the TLS certificate and key files are configured.

func (*Conf) OverrideMode

func (c *Conf) OverrideMode(mode string)

OverrideMode checks the mode parameter for validity and logs that it's being overridden.

func (*Conf) Overwrite

func (c *Conf) Overwrite() error

Overwrite stores this configuration object as flamenco-manager.yaml.

func (*Conf) Write

func (c *Conf) Write(filename string) error

Write saves the current in-memory configuration to a YAML file.

type Dashboard

type Dashboard struct {

	// Set by main.go
	RestartFunction func()
	// contains filtered or unexported fields
}

Dashboard can show HTML and JSON reports.

func CreateDashboard

func CreateDashboard(config *Conf,
	session *mgo.Session,
	sleeper *SleepScheduler,
	blacklist *WorkerBlacklist,
	flamencoVersion string,
) *Dashboard

CreateDashboard creates a new Dashboard object.

func (*Dashboard) AddRoutes

func (dash *Dashboard) AddRoutes(router *mux.Router, auther jwtauth.Authenticator)

AddRoutes adds routes to serve reporting status requests.

type FileProduced

type FileProduced struct {
	Paths []string `json:"paths"`
}

FileProduced is sent by the worker whenever it produces (e.g. renders) some file. This hooks into the fswatcher system.

type ImageWatcher

type ImageWatcher struct {

	// The public channel, from which can only be read.
	ImageCreated <-chan string
	// contains filtered or unexported fields
}

ImageWatcher watches a filesystem directory.

func CreateImageWatcher

func CreateImageWatcher(pathToWatch string, bufferSize int) *ImageWatcher

CreateImageWatcher creates a new ImageWatcher for the given directory. bufferSize is the size of the iw.ImageCreated channel.

func (*ImageWatcher) Close

func (iw *ImageWatcher) Close()

Close cleanly shuts down the watcher.

func (*ImageWatcher) Go

func (iw *ImageWatcher) Go()

Go starts the watcher in a separate gofunc.

type JobTask

type JobTask struct {
	Job  bson.ObjectId `json:"job"`
	Task bson.ObjectId `json:"task"`
}

JobTask is a tuple (Job ID, Task ID)

type LatestImageSystem

type LatestImageSystem struct {
	// contains filtered or unexported fields
}

LatestImageSystem ties an ImageWatcher to a the fswatcher_middle and fswatcher_http stuff, allowing the results to be pushed via HTTP to browsers.

func CreateLatestImageSystem

func CreateLatestImageSystem(watchPath string) *LatestImageSystem

CreateLatestImageSystem sets up a LatestImageSystem

func (*LatestImageSystem) AddRoutes

func (lis *LatestImageSystem) AddRoutes(
	router *mux.Router,
	workerAuth *auth.BasicAuth,
	userAuth jwtauth.Authenticator,
)

AddRoutes adds the HTTP Server-Side Events endpoint to the router.

func (*LatestImageSystem) Close

func (lis *LatestImageSystem) Close()

Close gracefully shuts down the image watcher, if the path to watch isn't empty.

func (*LatestImageSystem) Go

func (lis *LatestImageSystem) Go()

Go starts the image watcher, if the path to watch isn't empty.

type Lazyness

type Lazyness bool

Lazyness indicates whether a worker's requested status change is lazy (true) or immediate (false).

const (
	// Immediate status change requests interrupt the currently running task.
	Immediate Lazyness = false
	// Lazy status change requests are applied when the currently running task finishes.
	Lazy Lazyness = true
)

type M

type M bson.M

M is a shortcut for bson.M to make longer queries easier to read.

type MayKeepRunningResponse

type MayKeepRunningResponse struct {
	MayKeepRunning bool   `json:"may_keep_running"`
	Reason         string `json:"reason,omitempty"`

	// For controlling sleeping & waking up. For values, see the workerStatusXXX constants.
	StatusRequested string `json:"status_requested,omitempty"`
}

MayKeepRunningResponse is sent to workers to indicate whether they can keep running their task.

type OpenRegAuth

type OpenRegAuth struct{}

OpenRegAuth allows anybody to register as a Worker.

func (OpenRegAuth) Wrap

func (ora OpenRegAuth) Wrap(handler http.Handler) http.Handler

Wrap does not do anything.

func (OpenRegAuth) WrapFunc

func (ora OpenRegAuth) WrapFunc(handlerFunc func(w http.ResponseWriter, r *http.Request)) http.Handler

WrapFunc does not do anything.

type PSKRegAuth

type PSKRegAuth struct {
	// contains filtered or unexported fields
}

PSKRegAuth authorises based on JWT tokens signed with HMAC + a pre-shared key.

func (*PSKRegAuth) Wrap

func (pra *PSKRegAuth) Wrap(handler http.Handler) http.Handler

Wrap requires that the request is authenticated with the proper JWT bearer token.

func (*PSKRegAuth) WrapFunc

func (pra *PSKRegAuth) WrapFunc(handlerFunc func(w http.ResponseWriter, r *http.Request)) http.Handler

WrapFunc requires that the request is authenticated with the proper JWT bearer token.

type RegistrationAuth

type RegistrationAuth interface {
	Wrap(handler http.Handler) http.Handler
	WrapFunc(handlerFunc func(w http.ResponseWriter, r *http.Request)) http.Handler
}

RegistrationAuth is the interface for all worker registration authorisers.

func NewWorkerRegistrationAuthoriser

func NewWorkerRegistrationAuthoriser(config *Conf) RegistrationAuth

NewWorkerRegistrationAuthoriser creates a new RegistrationAuth. If a pre-shared secret key is configured, creates a PSKRegAuth, otherwise creates an OpenRegAuth.

type ScheduleInfo

type ScheduleInfo struct {
	ScheduleActive bool `bson:"schedule_active" json:"schedule_active"`

	// Space-separated two-letter strings indicating days of week the schedule is active.
	// Empty means "every day".
	DaysOfWeek string `bson:"days_of_week,omitempty" json:"days_of_week,omitempty"`

	// Start and end time of the day at which the schedule is active.
	// Applies only when today is in DaysOfWeek, or when DaysOfWeek is empty.
	// No 'time_' prefix for BSON as it already serialises {time: "15:04:05"}.
	TimeStart *TimeOfDay `bson:"start,omitempty" json:"time_start,omitempty"`
	TimeEnd   *TimeOfDay `bson:"end,omitempty" json:"time_end,omitempty"`
	NextCheck *time.Time `bson:"next_check,omitempty" json:"next_check,omitempty"`
}

ScheduleInfo for automatically sending a Worker to sleep & waking up.

type ScheduledTasks

type ScheduledTasks struct {
	Depsgraph []Task `json:"depsgraph"`
}

ScheduledTasks contains a dependency graph response from Server.

type SettingsInMongo

type SettingsInMongo struct {
	DepsgraphLastModified *string `bson:"depsgraph_last_modified"`
}

SettingsInMongo contains settings we want to be able to update from within Flamenco Manager itself, so those are stored in MongoDB.

func GetSettings

func GetSettings(db *mgo.Database) *SettingsInMongo

GetSettings returns the settings as saved in our MongoDB.

type SleepScheduler

type SleepScheduler struct {
	// contains filtered or unexported fields
}

SleepScheduler manages wake/sleep cycles of Workers.

func CreateSleepScheduler

func CreateSleepScheduler(session *mgo.Session) *SleepScheduler

CreateSleepScheduler creates a new SleepScheduler.

func (*SleepScheduler) Close

func (ss *SleepScheduler) Close()

Close gracefully shuts down the sleep scheduler goroutine.

func (*SleepScheduler) DeactivateSleepSchedule

func (ss *SleepScheduler) DeactivateSleepSchedule(worker *Worker, db *mgo.Database) error

DeactivateSleepSchedule deactivates the worker's sleep schedule.

func (*SleepScheduler) Go

func (ss *SleepScheduler) Go()

Go starts a new goroutine to perform the periodic checking of the schedule.

func (*SleepScheduler) RefreshAllWorkers

func (ss *SleepScheduler) RefreshAllWorkers()

RefreshAllWorkers updates the status of all workers for which a schedule is active.

func (*SleepScheduler) RequestWorkerStatus

func (ss *SleepScheduler) RequestWorkerStatus(worker *Worker, db *mgo.Database)

RequestWorkerStatus sets worker.StatusRequested if the scheduler demands a status change.

func (*SleepScheduler) SetSleepSchedule

func (ss *SleepScheduler) SetSleepSchedule(worker *Worker, schedule ScheduleInfo, db *mgo.Database) error

SetSleepSchedule stores the given schedule as the worker's new sleep schedule and applies it. Updates both the Worker object itself and the Mongo database. Instantly requests a new status for the worker according to the schedule.

type StatusReport

type StatusReport struct {
	NrOfWorkers       int      `json:"nr_of_workers"`
	NrOfTasks         int      `json:"nr_of_tasks"`
	UpstreamQueueSize int      `json:"upstream_queue_size"`
	Version           string   `json:"version"`
	Workers           []Worker `json:"workers"`
	ManagerName       string   `json:"manager_name"`
	ManagerMode       string   `json:"manager_mode"` // either "develop" or "production", see settings.go Conf.Mode.
	Server            struct {
		Name string `json:"name"`
		URL  string `json:"url"`
	} `json:"server"`
}

StatusReport is sent in response to a query on the / URL.

type Task

type Task struct {
	ID          bson.ObjectId   `bson:"_id,omitempty" json:"_id,omitempty"`
	Etag        string          `bson:"_etag,omitempty" json:"_etag,omitempty"`
	Job         bson.ObjectId   `bson:"job,omitempty" json:"job"`
	Manager     bson.ObjectId   `bson:"manager,omitempty" json:"manager"`
	Project     bson.ObjectId   `bson:"project,omitempty" json:"project"`
	User        bson.ObjectId   `bson:"user,omitempty" json:"user"`
	Name        string          `bson:"name" json:"name"`
	Status      string          `bson:"status" json:"status"`
	Priority    int             `bson:"priority" json:"priority"`
	JobPriority int             `bson:"job_priority" json:"job_priority"`
	JobType     string          `bson:"job_type" json:"job_type"`
	TaskType    string          `bson:"task_type" json:"task_type"`
	Commands    []Command       `bson:"commands" json:"commands"`
	Log         string          `bson:"log,omitempty" json:"log,omitempty"`
	Activity    string          `bson:"activity,omitempty" json:"activity,omitempty"`
	Parents     []bson.ObjectId `bson:"parents,omitempty" json:"parents,omitempty"`
	Worker      string          `bson:"worker,omitempty" json:"worker,omitempty"`

	FailedByWorkers []WorkerRef  `bson:"failed_by_workers,omitempty" json:"failed_by_workers,omitempty"` // Workers who tried this task and failed.
	Metrics         *TaskMetrics `bson:"metrics,omitempty" json:"metrics,omitempty"`

	// Internal bookkeeping
	WorkerID       *bson.ObjectId `bson:"worker_id,omitempty" json:"-"`        // The worker assigned to this task.
	LastWorkerPing *time.Time     `bson:"last_worker_ping,omitempty" json:"-"` // When a worker last said it was working on this. Might not have been a task update.
	LastUpdated    *time.Time     `bson:"last_updated,omitempty" json:"-"`     // when we have last seen an update.
}

Task contains a Flamenco task, with some BSON-only fields for local Manager use.

type TaskCleaner

type TaskCleaner struct {
	// contains filtered or unexported fields
}

TaskCleaner periodically deletes tasks that haven't been touched in a long time.

func CreateTaskCleaner

func CreateTaskCleaner(config *Conf, session *mgo.Session) *TaskCleaner

CreateTaskCleaner creates a new TaskCleaner with default timings.

func (*TaskCleaner) Close

func (tc *TaskCleaner) Close()

Close gracefully shuts down the task timeout checker goroutine.

func (*TaskCleaner) Go

func (tc *TaskCleaner) Go()

Go starts a new goroutine to perform the periodic checking.

type TaskLogUploader

type TaskLogUploader struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

TaskLogUploader sends compressed task log files to Flamenco Server.

The task IDs are queued first. If the tuple is already queued, queueing is a no-op, even when uploading is already in progress. This allows the Server to maintain the queue of to-be-uploaded task logs; we don't have to persist anything to disk.

func CreateTaskLogUploader

func CreateTaskLogUploader(config *Conf, upstream *UpstreamConnection) *TaskLogUploader

CreateTaskLogUploader creates a new TaskLogUploader.

func (*TaskLogUploader) Close

func (tlu *TaskLogUploader) Close()

Close gracefully shuts down the task uploader goroutine.

func (*TaskLogUploader) Go

func (tlu *TaskLogUploader) Go()

Go starts a goroutine that monitors the queue and uploads task logs.

func (*TaskLogUploader) QueueAll

func (tlu *TaskLogUploader) QueueAll(jobTasks []JobTask)

QueueAll places all (Job ID, Task ID) tuples on the queue for uploading later. This function will keep getting called with tasks until those tasks have had their logfile uploaded to the Server.

type TaskMetrics

type TaskMetrics struct {
	Timing map[string]float64 `bson:"timing,omitempty" json:"timing,omitempty"`
}

TaskMetrics contains metrics on a specific task, such as timing information.

type TaskScheduler

type TaskScheduler struct {
	// contains filtered or unexported fields
}

TaskScheduler offers tasks to Workers when they ask for them.

func CreateTaskScheduler

func CreateTaskScheduler(config *Conf,
	upstream *UpstreamConnection,
	session *mgo.Session,
	queue *TaskUpdateQueue,
	blacklist *WorkerBlacklist,
	pusher *TaskUpdatePusher,
) *TaskScheduler

CreateTaskScheduler constructs a new TaskScheduler, including private fields.

func (*TaskScheduler) ReturnTask

func (ts *TaskScheduler) ReturnTask(worker *Worker, logFields log.Fields,
	db *mgo.Database, task *Task, reasonForReturn string) error

ReturnTask lets a Worker return its tasks to the queue, for execution by another worker.

func (*TaskScheduler) ReturnTaskFromWorker

func (ts *TaskScheduler) ReturnTaskFromWorker(w http.ResponseWriter, r *auth.AuthenticatedRequest,
	db *mgo.Database, taskID bson.ObjectId)

ReturnTaskFromWorker is the HTTP interface for workers to return a specific task to the queue.

func (*TaskScheduler) ScheduleTask

func (ts *TaskScheduler) ScheduleTask(w http.ResponseWriter, r *auth.AuthenticatedRequest)

ScheduleTask assigns a task to a worker.

func (*TaskScheduler) WorkerMayRunTask

func (ts *TaskScheduler) WorkerMayRunTask(w http.ResponseWriter, r *auth.AuthenticatedRequest,
	db *mgo.Database, taskID bson.ObjectId)

WorkerMayRunTask tells the worker whether it's allowed to keep running the given task.

type TaskUpdate

type TaskUpdate struct {
	ID                        bson.ObjectId `bson:"_id" json:"_id"`
	TaskID                    bson.ObjectId `bson:"task_id" json:"task_id,omitempty"`
	TaskStatus                string        `bson:"task_status,omitempty" json:"task_status,omitempty"`
	ReceivedOnManager         time.Time     `bson:"received_on_manager" json:"received_on_manager"`
	Activity                  string        `bson:"activity,omitempty" json:"activity,omitempty"`
	TaskProgressPercentage    int           `bson:"task_progress_percentage" json:"task_progress_percentage"`
	CurrentCommandIdx         int           `bson:"current_command_idx" json:"current_command_idx"`
	CommandProgressPercentage int           `bson:"command_progress_percentage" json:"command_progress_percentage"`
	Log                       string        `bson:"log,omitempty" json:"log,omitempty"`           // for appending to Server-side log
	LogTail                   string        `bson:"log_tail,omitempty" json:"log_tail,omitempty"` // for overwriting on Server-side task
	Worker                    string        `bson:"worker" json:"worker"`

	FailedByWorkers []WorkerRef  `bson:"failed_by_workers,omitempty" json:"failed_by_workers,omitempty"` // Workers who tried this task and failed.
	Metrics         *TaskMetrics `bson:"metrics,omitempty" json:"metrics,omitempty"`
	// contains filtered or unexported fields
}

TaskUpdate is both sent from Worker to Manager, as well as from Manager to Server.

type TaskUpdatePusher

type TaskUpdatePusher struct {
	// contains filtered or unexported fields
}

TaskUpdatePusher pushes queued task updates to the Flamenco Server.

func CreateTaskUpdatePusher

func CreateTaskUpdatePusher(
	config *Conf,
	upstream *UpstreamConnection,
	session *mgo.Session,
	queue *TaskUpdateQueue,
	taskLogUploader *TaskLogUploader,
) *TaskUpdatePusher

CreateTaskUpdatePusher creates a new task update pusher that runs in a separate goroutine.

func (*TaskUpdatePusher) Close

func (pusher *TaskUpdatePusher) Close()

Close closes the task update pusher by stopping all timers & goroutines.

func (*TaskUpdatePusher) Go

func (pusher *TaskUpdatePusher) Go()

Go starts the goroutine.

func (*TaskUpdatePusher) Kick

func (pusher *TaskUpdatePusher) Kick()

Kick forces a task update push.

type TaskUpdateQueue

type TaskUpdateQueue struct {
	// contains filtered or unexported fields
}

TaskUpdateQueue queues task updates for later pushing, and writes log files to disk.

func CreateTaskUpdateQueue

func CreateTaskUpdateQueue(config *Conf, blacklist *WorkerBlacklist) *TaskUpdateQueue

CreateTaskUpdateQueue creates a new TaskUpdateQueue.

func (*TaskUpdateQueue) LogTaskActivity

func (tuq *TaskUpdateQueue) LogTaskActivity(worker *Worker, task *Task, activity, logLine string, db *mgo.Database)

LogTaskActivity creates and queues a TaskUpdate to store activity and a log line.

func (*TaskUpdateQueue) QueueTaskUpdate

func (tuq *TaskUpdateQueue) QueueTaskUpdate(task *Task, tupdate *TaskUpdate, db *mgo.Database) error

QueueTaskUpdate queues the task update, without any extra updates.

func (*TaskUpdateQueue) QueueTaskUpdateFromWorker

func (tuq *TaskUpdateQueue) QueueTaskUpdateFromWorker(w http.ResponseWriter, r *auth.AuthenticatedRequest,
	db *mgo.Database, taskID bson.ObjectId)

QueueTaskUpdateFromWorker receives a task update from a worker, and queues it for sending to Flamenco Server.

func (*TaskUpdateQueue) QueueTaskUpdateWithExtra

func (tuq *TaskUpdateQueue) QueueTaskUpdateWithExtra(task *Task, tupdate *TaskUpdate, db *mgo.Database, extraUpdates bson.M) error

QueueTaskUpdateWithExtra does the same as QueueTaskUpdate(), but with extra updates to the local flamenco_tasks collection.

type TaskUpdateResponse

type TaskUpdateResponse struct {
	ModifiedCount    int             `json:"modified_count"`
	HandledUpdateIds []bson.ObjectId `json:"handled_update_ids,omitempty"`
	CancelTasksIds   []bson.ObjectId `json:"cancel_task_ids,omitempty"`

	// Job/Task IDs for which we should send the task log to the Server.
	UploadTaskFileQueue []JobTask `json:"upload_task_file_queue,omitempty"`
}

TaskUpdateResponse is received from Server.

type TestTasks

type TestTasks struct {
	BlenderRender BlenderRenderConfig `yaml:"test_blender_render"`
}

TestTasks represents the 'test_tasks' key in the Manager's configuration file.

type TimeOfDay

type TimeOfDay struct {
	Hour   int
	Minute int
}

TimeOfDay is marshalled as 'HH:MM'. Its date and timezone components are ignored, and the time is supposed to be interpreted as local time on any date (f.e. a scheduled sleep time of some Worker on a certain day-of-week & local timezone).

func MakeTimeOfDay

func MakeTimeOfDay(someTime time.Time) TimeOfDay

MakeTimeOfDay converts a time.Time into a TimeOfDay.

func (TimeOfDay) Equals

func (ot TimeOfDay) Equals(other TimeOfDay) bool

Equals returns True iff both times represent the same time of day.

func (TimeOfDay) GetBSON

func (ot TimeOfDay) GetBSON() (interface{}, error)

GetBSON turns a time.Time instance into BSON.

func (TimeOfDay) IsAfter

func (ot TimeOfDay) IsAfter(other TimeOfDay) bool

IsAfter returns True iff ot is after other. Ignores everything except hour and minute fields.

func (TimeOfDay) IsBefore

func (ot TimeOfDay) IsBefore(other TimeOfDay) bool

IsBefore returns True iff ot is before other. Ignores everything except hour and minute fields.

func (TimeOfDay) MarshalJSON

func (ot TimeOfDay) MarshalJSON() ([]byte, error)

MarshalJSON turns a time.Time instance into a "HH:MM" string.

func (TimeOfDay) OnDate

func (ot TimeOfDay) OnDate(date time.Time) time.Time

OnDate returns the time of day in the local timezone on the given date.

func (*TimeOfDay) SetBSON

func (ot *TimeOfDay) SetBSON(raw bson.Raw) error

SetBSON turns BSON an TimeOfDay object.

func (TimeOfDay) String

func (ot TimeOfDay) String() string

func (*TimeOfDay) UnmarshalJSON

func (ot *TimeOfDay) UnmarshalJSON(b []byte) error

UnmarshalJSON turns a "HH:MM" string into a time.Time instance.

type TimeoutChecker

type TimeoutChecker struct {
	// contains filtered or unexported fields
}

TimeoutChecker periodically times out tasks and workers if the worker hasn't sent any update recently.

func CreateTimeoutChecker

func CreateTimeoutChecker(config *Conf, session *mgo.Session, queue *TaskUpdateQueue, scheduler *TaskScheduler) *TimeoutChecker

CreateTimeoutChecker creates a new TimeoutChecker.

func (*TimeoutChecker) Close

func (ttc *TimeoutChecker) Close()

Close gracefully shuts down the task timeout checker goroutine.

func (*TimeoutChecker) Go

func (ttc *TimeoutChecker) Go()

Go starts a new goroutine to perform the periodic checking.

type UpstreamConnection

type UpstreamConnection struct {
	// contains filtered or unexported fields
}

UpstreamConnection represents a connection to an upstream Flamenco Server.

func ConnectUpstream

func ConnectUpstream(config *Conf, session *mgo.Session) *UpstreamConnection

ConnectUpstream creates a new UpstreamConnection object and starts the task download loop.

func (*UpstreamConnection) Close

func (uc *UpstreamConnection) Close()

Close gracefully closes the upstream connection by stopping all upload/download loops.

func (*UpstreamConnection) KickDownloader

func (uc *UpstreamConnection) KickDownloader(synchronous bool)

KickDownloader fetches new tasks from the Flamenco Server.

func (*UpstreamConnection) RefetchTask

func (uc *UpstreamConnection) RefetchTask(task *Task) bool

RefetchTask re-fetches a task from the Server, but only if its etag changed.

  • If the etag changed, the differences between the old and new status are handled.
  • If the Server cannot be reached, this error is ignored and the task is untouched.
  • If the Server returns an error code other than 500 Internal Server Error, (Forbidden, Not Found, etc.) the task is removed from the local task queue.

If the task was untouched, this function returns false. If it was changed or removed, this function return true.

func (*UpstreamConnection) ResolveURL

func (uc *UpstreamConnection) ResolveURL(relativeURL string, a ...interface{}) (*url.URL, error)

ResolveURL returns the given URL relative to the base URL of the upstream server, as absolute URL.

func (*UpstreamConnection) SendJSON

func (uc *UpstreamConnection) SendJSON(logprefix, method string, url *url.URL,
	payload interface{},
	responsehandler func(resp *http.Response, body []byte) error,
) error

SendJSON sends a JSON document to the given URL.

func (*UpstreamConnection) SendTaskUpdates

func (uc *UpstreamConnection) SendTaskUpdates(updates []TaskUpdate) (*TaskUpdateResponse, error)

SendTaskUpdates performs a POST to /api/flamenco/managers/{manager-id}/task-update-batch to send a batch of task updates to the Server.

type UpstreamNotification

type UpstreamNotification struct {
	// Settings
	ManagerURL               string                       `json:"manager_url"`
	VariablesByVarname       map[string]map[string]string `json:"variables"`
	PathReplacementByVarname map[string]map[string]string `json:"path_replacement"`

	// From our local database
	NumberOfWorkers int      `json:"nr_of_workers"`
	WorkerTaskTypes []string `json:"worker_task_types"`
}

UpstreamNotification sent to upstream Flamenco Server upon startup and when workers change their task types. This is a combination of settings (see settings.go) and information from the database.

type UpstreamNotifier

type UpstreamNotifier struct {
	// contains filtered or unexported fields
}

UpstreamNotifier sends a signal to Flamenco Server that we've started or changed configuration.

func CreateUpstreamNotifier

func CreateUpstreamNotifier(config *Conf, upstream *UpstreamConnection, session *mgo.Session) *UpstreamNotifier

CreateUpstreamNotifier creates a new notifier.

func (*UpstreamNotifier) Close

func (un *UpstreamNotifier) Close()

Close performs a clean shutdown.

func (*UpstreamNotifier) SendStartupNotification

func (un *UpstreamNotifier) SendStartupNotification()

SendStartupNotification sends a StartupNotification document to upstream Flamenco Server. Keeps trying in a goroutine until the notification was succesful.

func (*UpstreamNotifier) SendTaskTypesNotification

func (un *UpstreamNotifier) SendTaskTypesNotification()

SendTaskTypesNotification sends a StartupNotification document to upstream Flamenco Server. Keeps trying in a goroutine until the notification was succesful.

type Worker

type Worker struct {
	ID                 bson.ObjectId  `bson:"_id,omitempty" json:"_id,omitempty"`
	Secret             string         `bson:"-" json:"-"`
	HashedSecret       []byte         `bson:"hashed_secret" json:"-"`
	Nickname           string         `bson:"nickname" json:"nickname"`
	Address            string         `bson:"address" json:"address"`
	Status             string         `bson:"status" json:"status"`
	Platform           string         `bson:"platform" json:"platform"`
	CurrentTask        *bson.ObjectId `bson:"current_task,omitempty" json:"current_task,omitempty"`
	TimeCost           int            `bson:"time_cost" json:"time_cost"`
	LastActivity       *time.Time     `bson:"last_activity,omitempty" json:"last_activity,omitempty"`
	SupportedTaskTypes []string       `bson:"supported_task_types" json:"supported_task_types"`
	Software           string         `bson:"software" json:"software"`

	// For dashboard
	CurrentTaskStatus  string        `bson:"current_task_status,omitempty" json:"current_task_status,omitempty"`
	CurrentTaskUpdated *time.Time    `bson:"current_task_updated,omitempty" json:"current_task_updated,omitempty"`
	CurrentJob         bson.ObjectId `bson:"current_job,omitempty" json:"current_job,omitempty"`

	// For controlling sleeping & waking up. For values, see the workerStatusXXX constants.
	StatusRequested   string       `bson:"status_requested" json:"status_requested"`
	LazyStatusRequest Lazyness     `bson:"lazy_status_request" json:"lazy_status_request"` // Only apply requested status when current task is finished.
	SleepSchedule     ScheduleInfo `bson:"sleep_schedule,omitempty" json:"sleep_schedule"`

	// For preventing a failing worker from eating up all tasks of a certain job.
	Blacklist []WorkerBlacklistEntry `json:"blacklist,omitempty"`
}

Worker contains all information about a specific Worker. Some fields come from the WorkerRegistration, whereas others are filled by us.

func FindWorker

func FindWorker(workerID string, projection interface{}, db *mgo.Database) (*Worker, error)

FindWorker returns the worker given its ID in string form.

func FindWorkerByID

func FindWorkerByID(workerID bson.ObjectId, db *mgo.Database) (*Worker, error)

FindWorkerByID returns the entire worker, no projections.

func (*Worker) AckStatusChange

func (worker *Worker) AckStatusChange(newStatus string, db *mgo.Database) error

AckStatusChange acknowledges the requested status change by moving it to the actual status. Only the "shutdown" status should not be acknowledged, but just result in a signoff and thus directly go to "offline" state.

func (*Worker) AckTimeout

func (worker *Worker) AckTimeout(db *mgo.Database) error

AckTimeout acknowledges the timeout and just sets the worker to "offline".

func (*Worker) Identifier

func (worker *Worker) Identifier() string

Identifier returns the worker's address, with the nickname in parentheses (if set).

Make sure that you include the nickname in the projection when you fetch the worker from MongoDB.

func (*Worker) RequestStatusChange

func (worker *Worker) RequestStatusChange(newStatus string, lazy Lazyness, db *mgo.Database) error

RequestStatusChange stores the new requested status in MongoDB, so that it gets picked up by the worker the next time it asks for it. Parameter 'lazy' indicates that the worker can finish the current task first, before applying the status change.

func (*Worker) Seen

func (worker *Worker) Seen(r *http.Request, db *mgo.Database)

Seen registers that we have seen this worker at a certain address and with certain software.

func (*Worker) SeenEx

func (worker *Worker) SeenEx(r *http.Request, db *mgo.Database, set bson.M, unset bson.M) error

SeenEx is same as Seen(), but allows for extra updates on the worker in the database, and returns err

func (*Worker) SetAwake

func (worker *Worker) SetAwake(db *mgo.Database) error

SetAwake sets the worker status to Awake, but only if's not already awake or testing.

func (*Worker) SetCurrentTask

func (worker *Worker) SetCurrentTask(taskID bson.ObjectId, db *mgo.Database) error

SetCurrentTask sets the worker's current task, and updates the database too.

func (*Worker) SetStatus

func (worker *Worker) SetStatus(status string, db *mgo.Database) error

SetStatus sets the worker's status, and updates the database too. Use SetAwake() instead of calling this function with status="awake".

func (*Worker) Timeout

func (worker *Worker) Timeout(db *mgo.Database, scheduler *TaskScheduler)

Timeout marks the worker as timed out.

func (*Worker) TimeoutOnTask

func (worker *Worker) TimeoutOnTask(task *Task, db *mgo.Database, scheduler *TaskScheduler)

TimeoutOnTask marks the worker as timed out on a given task. The task is just used for logging.

type WorkerBlacklist

type WorkerBlacklist struct {
	// contains filtered or unexported fields
}

WorkerBlacklist stores (worker ID, job ID, task type) tuples for failed tasks.

func CreateWorkerBlackList

func CreateWorkerBlackList(config *Conf, session *mgo.Session) *WorkerBlacklist

CreateWorkerBlackList creates a new WorkerBlackList instance.

func (*WorkerBlacklist) Add

func (wbl *WorkerBlacklist) Add(workerID bson.ObjectId, task *Task) error

Add makes it impossible for the worker to run tasks of the same type on the same job.

func (*WorkerBlacklist) BlacklistForWorker

func (wbl *WorkerBlacklist) BlacklistForWorker(workerID bson.ObjectId) M

BlacklistForWorker returns a partial MongoDB query that can be used to filter out blacklisted tasks.

func (*WorkerBlacklist) EnsureDBIndices

func (wbl *WorkerBlacklist) EnsureDBIndices()

EnsureDBIndices ensures the MongoDB indices are there.

func (*WorkerBlacklist) RemoveLine

func (wbl *WorkerBlacklist) RemoveLine(workerID bson.ObjectId, jobID bson.ObjectId, taskType string) error

RemoveLine removes a single blacklist entry. This is a no-op if the entry doesn't exist.

func (*WorkerBlacklist) WorkersLeft

func (wbl *WorkerBlacklist) WorkersLeft(jobID bson.ObjectId, taskType string) map[bson.ObjectId]bool

WorkersLeft returns the IDs of workers NOT blacklisted for this task type on this job.

type WorkerBlacklistEntry

type WorkerBlacklistEntry struct {
	Created  time.Time     `bson:"_created" json:"_created"`
	WorkerID bson.ObjectId `bson:"worker_id" json:"worker_id,omitempty"`
	JobID    bson.ObjectId `bson:"job_id" json:"job_id"`
	TaskType string        `bson:"task_type" json:"task_type"`
}

WorkerBlacklistEntry prevents a certain worker from running certain task types on certain jobs.

type WorkerRef

type WorkerRef struct {
	// ID is the worker's ID, and is the actual reference. It is not guaranteed to exist because workers can be deleted.
	ID bson.ObjectId `bson:"id" json:"id"`
	// Identifier is the human-readable identification of the worker (IP address + nickname).
	Identifier string `bson:"identifier" json:"identifier"`
}

WorkerRef is a reference to a worker.

type WorkerRegistration

type WorkerRegistration struct {
	Secret             string   `json:"secret"`
	Platform           string   `json:"platform"`
	SupportedTaskTypes []string `json:"supported_task_types"`
	Nickname           string   `json:"nickname"`
}

WorkerRegistration is sent by the Worker to register itself at this Manager.

type WorkerRemover

type WorkerRemover struct {
	// contains filtered or unexported fields
}

WorkerRemover periodically removes offline workers.

func CreateWorkerRemover

func CreateWorkerRemover(config *Conf, session *mgo.Session, scheduler *TaskScheduler) *WorkerRemover

CreateWorkerRemover creates a WorkerRemover, or returns nil if the configuration disables automatic worker removal.

func (*WorkerRemover) Close

func (wr *WorkerRemover) Close()

Close signals the WorkerRemover goroutine to stop and waits for it to close.

func (*WorkerRemover) Go

func (wr *WorkerRemover) Go()

Go starts a goroutine that periodically checks workers.

type WorkerSignonDoc

type WorkerSignonDoc struct {
	SupportedTaskTypes []string `json:"supported_task_types,omitempty"`
	Nickname           string   `json:"nickname,omitempty"`
}

WorkerSignonDoc is sent by the Worker upon sign-on.

type WorkerStatus

type WorkerStatus struct {
	// For controlling sleeping & waking up. For values, see the workerStatusXXX constants.
	StatusRequested string `bson:"status_requested" json:"status_requested"`
}

WorkerStatus indicates that a status change was requested on the worker. It is sent as response by the scheduler when a worker is not allowed to receive a new task.

Directories

Path Synopsis
Package chantools was obtained from https://github.com/theepicsnail/goChanTools and subsequently altered for our needs.
Package chantools was obtained from https://github.com/theepicsnail/goChanTools and subsequently altered for our needs.
Package slugify provide a function that gives a non accentuated and minus separated string from a accentuated string.
Package slugify provide a function that gives a non accentuated and minus separated string from a accentuated string.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL