worker

package
v0.0.0-...-8b9a2a1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 12, 2024 License: GPL-3.0 Imports: 36 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrNoExecCmd = errors.New("no exec.Cmd could be created")

ErrNoExecCmd means CommandLineRunner.CommandContext() returned nil. This shouldn't happen in production, but can happen in unit tests when the test just wants to check the CLI arguments that are supposed to be executed, without actually executing anything.

View Source
var (
	ErrTaskReassigned = errors.New("task was not assigned to this worker")
)

Functions

func AutodiscoverManager

func AutodiscoverManager(ctx context.Context) (string, error)

AutodiscoverManager uses UPnP/SSDP to find a Manager, and returns its URL if found.

func MaybeAutodiscoverManager

func MaybeAutodiscoverManager(ctx context.Context, configWrangler *FileConfigWrangler) error

maybeAutodiscoverManager starts Manager auto-discovery if there is no Manager URL configured yet.

func ParseURL

func ParseURL(rawURL string) (*url.URL, error)

ParseURL allows URLs without scheme (assumes HTTP).

func SignOff

func SignOff(ctx context.Context, logger zerolog.Logger, client FlamencoClient)

SignOff sends a signoff request to the Manager. Any error is logged but not returned.

Types

type BlenderParameters

type BlenderParameters struct {
	// contains filtered or unexported fields
}

type CommandExecutor

type CommandExecutor struct {
	// contains filtered or unexported fields
}

func NewCommandExecutor

func NewCommandExecutor(cli CommandLineRunner, listener CommandListener, timeService TimeService) *CommandExecutor

func (*CommandExecutor) Run

func (ce *CommandExecutor) Run(ctx context.Context, taskID string, cmd api.Command) error

type CommandLineRunner

type CommandLineRunner interface {
	CommandContext(ctx context.Context, name string, arg ...string) *exec.Cmd
	RunWithTextOutput(
		ctx context.Context,
		logger zerolog.Logger,
		execCmd *exec.Cmd,
		logChunker cli_runner.LogChunker,
		lineChannel chan<- string,
	) error
}

CommandLineRunner is an interface around exec.CommandContext().

type CommandListener

type CommandListener interface {
	// LogProduced sends any logging to whatever service for storing logging.
	// logLines are concatenated.
	LogProduced(ctx context.Context, taskID string, logLines ...string) error
	// OutputProduced tells the Manager there has been some output (most commonly a rendered frame or video).
	OutputProduced(ctx context.Context, taskID string, outputLocation string) error
}

CommandListener sends the result of commands (log, output files) to the Manager.

type CommandRunner

type CommandRunner interface {
	Run(ctx context.Context, taskID string, cmd api.Command) error
}

type CreateVideoParams

type CreateVideoParams struct {
	// contains filtered or unexported fields
}

type ExecParams

type ExecParams struct {
	// contains filtered or unexported fields
}

type FileConfigWrangler

type FileConfigWrangler struct {
	// contains filtered or unexported fields
}

FileConfigWrangler is the default config wrangler that actually reads & writes files.

func NewConfigWrangler

func NewConfigWrangler() FileConfigWrangler

NewConfigWrangler returns ConfigWrangler that reads files.

func (FileConfigWrangler) DefaultConfig

func (fcw FileConfigWrangler) DefaultConfig() WorkerConfig

DefaultConfig returns a fairly sane default configuration.

func (*FileConfigWrangler) SaveConfig

func (fcw *FileConfigWrangler) SaveConfig() error

func (*FileConfigWrangler) SaveCredentials

func (fcw *FileConfigWrangler) SaveCredentials(creds WorkerCredentials) error

func (*FileConfigWrangler) SetManagerURL

func (fcw *FileConfigWrangler) SetManagerURL(managerURL string)

SetManagerURL overwrites the Manager URL in the cached configuration. This is an in-memory change only, and will not be written to the config file.

func (*FileConfigWrangler) SetRestartExitCode

func (fcw *FileConfigWrangler) SetRestartExitCode(code int)

func (*FileConfigWrangler) WorkerConfig

func (fcw *FileConfigWrangler) WorkerConfig() (WorkerConfig, error)

WorkerConfig returns the worker configuration, or the default config if there is no config file. Configuration is only loaded from disk once; subsequent calls return the same config.

func (*FileConfigWrangler) WorkerCredentials

func (fcw *FileConfigWrangler) WorkerCredentials() (WorkerCredentials, error)

type FlamencoClient

type FlamencoClient interface {
	api.ClientWithResponsesInterface
}

FlamencoClient is a wrapper for api.ClientWithResponsesInterface so that locally mocks can be created.

func RegisterOrSignOn

func RegisterOrSignOn(ctx context.Context, configWrangler WorkerConfigWithCredentials) (
	client FlamencoClient, startupState api.WorkerStatus,
)

registerOrSignOn tries to sign on, and if that fails (or there are no credentials) tries to register. Returns an authenticated Flamenco OpenAPI client.

type Listener

type Listener struct {
	// contains filtered or unexported fields
}

Listener listens to the result of task and command execution, and sends it to the Manager.

func NewListener

func NewListener(client FlamencoClient, buffer UpstreamBuffer) *Listener

NewListener creates a new Listener that will send updates to the API client.

func (*Listener) LogProduced

func (l *Listener) LogProduced(ctx context.Context, taskID string, logLines ...string) error

LogProduced sends any logging to whatever service for storing logging.

func (*Listener) OutputProduced

func (l *Listener) OutputProduced(ctx context.Context, taskID string, outputLocation string) error

OutputProduced tells the Manager there has been some output (most commonly a rendered frame or video).

func (*Listener) Run

func (l *Listener) Run(ctx context.Context)

func (*Listener) TaskCompleted

func (l *Listener) TaskCompleted(ctx context.Context, taskID string) error

TaskCompleted tells the Manager the task has been completed.

func (*Listener) TaskFailed

func (l *Listener) TaskFailed(ctx context.Context, taskID string, reason string) error

TaskFailed tells the Manager the task failed for some reason.

func (*Listener) TaskStarted

func (l *Listener) TaskStarted(ctx context.Context, taskID string) error

TaskStarted tells the Manager that task execution has started.

type LogChunker

type LogChunker struct {
	// contains filtered or unexported fields
}

LogChunker gathers log lines in memory and sends them to a CommandListener. NOTE: LogChunker is not thread-safe.

func NewLogChunker

func NewLogChunker(taskID string, listerer CommandListener, timeService TimeService) *LogChunker

func (*LogChunker) Append

func (lc *LogChunker) Append(ctx context.Context, logLines ...string) error

Append log lines to the buffer, sending to the listener when the buffer gets too large.

func (*LogChunker) Flush

func (lc *LogChunker) Flush(ctx context.Context) error

Flush sends any buffered logs to the listener.

type OutputUploader

type OutputUploader struct {
	// contains filtered or unexported fields
}

OutputUploader sends (downscaled versions of) rendered images to Flamenco Manager. Only one image is sent at a time. A queue of a single image is kept, where newly queued images replace older ones.

func NewOutputUploader

func NewOutputUploader(client FlamencoClient) *OutputUploader

func (*OutputUploader) OutputProduced

func (ou *OutputUploader) OutputProduced(taskID, filename string)

OutputProduced enqueues the given filename for processing.

func (*OutputUploader) Run

func (ou *OutputUploader) Run(ctx context.Context)

type ParameterInvalidError

type ParameterInvalidError struct {
	Parameter string
	Cmd       api.Command
	Message   string
}

ParameterInvalidError is returned by command executors when a command parameter is invalid.

func NewParameterInvalidError

func NewParameterInvalidError(parameter string, cmd api.Command, message string, fmtArgs ...interface{}) ParameterInvalidError

func (ParameterInvalidError) Error

func (err ParameterInvalidError) Error() string

func (ParameterInvalidError) ParamValue

func (err ParameterInvalidError) ParamValue() interface{}

ParamValue returns the value of the invalid parameter.

type ParameterMissingError

type ParameterMissingError struct {
	Parameter string
	Cmd       api.Command
}

ParameterInvalidError is returned by command executors when a mandatory command parameter is missing.

func NewParameterMissingError

func NewParameterMissingError(parameter string, cmd api.Command) ParameterMissingError

func (ParameterMissingError) Error

func (err ParameterMissingError) Error() string

type ShutdownReason

type ShutdownReason int
const (
	ReasonContextClosed ShutdownReason = iota // Main Context closed.
	ReasonShutdownReq                         // Manager requested a shutdown.
	ReasonRestartReq                          // Manager requested a restart.
)

type StateStarter

type StateStarter func(context.Context)

type TaskExecutionListener

type TaskExecutionListener interface {
	// TaskStarted tells the Manager that task execution has started.
	TaskStarted(ctx context.Context, taskID string) error

	// TaskFailed tells the Manager the task failed for some reason.
	TaskFailed(ctx context.Context, taskID string, reason string) error

	// TaskCompleted tells the Manager the task has been completed.
	TaskCompleted(ctx context.Context, taskID string) error
}

TaskExecutionListener sends task lifecycle events (start/fail/complete) to the Manager.

type TaskExecutor

type TaskExecutor struct {
	// contains filtered or unexported fields
}

func NewTaskExecutor

func NewTaskExecutor(cmdRunner CommandRunner, listener TaskExecutionListener) *TaskExecutor

func (*TaskExecutor) Run

func (te *TaskExecutor) Run(ctx context.Context, task api.AssignedTask) error

Run runs a task. Returns ErrTaskReassigned when the task was reassigned to another worker.

type TaskOutput

type TaskOutput struct {
	TaskID   string
	Filename string
}

type TaskRunner

type TaskRunner interface {
	Run(ctx context.Context, task api.AssignedTask) error
}

type TimeService

type TimeService interface {
	After(duration time.Duration) <-chan time.Time
	Now() time.Time
}

TimeService is a service that operates on time.

type UpstreamBuffer

type UpstreamBuffer interface {
	SendTaskUpdate(ctx context.Context, taskID string, update api.TaskUpdateJSONRequestBody) error
}

UpstreamBuffer can buffer up-stream task updates, in case the Manager cannot be reached.

type UpstreamBufferDB

type UpstreamBufferDB struct {
	// contains filtered or unexported fields
}

UpstreamBufferDB implements the UpstreamBuffer interface using a database as backend.

func NewUpstreamBuffer

func NewUpstreamBuffer(client FlamencoClient, clock TimeService) (*UpstreamBufferDB, error)

func (*UpstreamBufferDB) Close

func (ub *UpstreamBufferDB) Close() error

Close performs one final flush, then releases the database.

func (*UpstreamBufferDB) Flush

func (ub *UpstreamBufferDB) Flush(ctx context.Context) error

func (*UpstreamBufferDB) OpenDB

func (ub *UpstreamBufferDB) OpenDB(dbCtx context.Context, databaseFilename string) error

OpenDB opens the database. Must be called once before using.

func (*UpstreamBufferDB) QueueSize

func (ub *UpstreamBufferDB) QueueSize() (int, error)

func (*UpstreamBufferDB) SendTaskUpdate

func (ub *UpstreamBufferDB) SendTaskUpdate(ctx context.Context, taskID string, update api.TaskUpdateJSONRequestBody) error

type UpstreamBufferPersistence

type UpstreamBufferPersistence interface {
	UpstreamBufferQueueSize(ctx context.Context) (int, error)
	UpstreamBufferQueue(ctx context.Context, taskID string, apiTaskUpdate api.TaskUpdateJSONRequestBody) error
	UpstreamBufferFrontItem(ctx context.Context) (*persistence.TaskUpdate, error)
	UpstreamBufferDiscard(ctx context.Context, queuedTaskUpdate *persistence.TaskUpdate) error
	Close() error
}

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

Worker performs regular Flamenco Worker operations.

func NewWorker

func NewWorker(
	flamenco FlamencoClient,
	taskRunner TaskRunner,
) *Worker

NewWorker constructs and returns a new Worker.

func (*Worker) Close

func (w *Worker) Close()

Close gracefully shuts down the Worker.

func (*Worker) SignOff

func (w *Worker) SignOff(ctx context.Context)

SignOff forces the worker in shutdown state and acknlowedges this to the Manager. Does NOT actually peform a shutdown; is intended to be called while shutdown is in progress.

func (*Worker) Start

func (w *Worker) Start(ctx context.Context, state api.WorkerStatus)

Start starts the worker by sending it to the given state.

func (*Worker) WaitForShutdown

func (w *Worker) WaitForShutdown(ctx context.Context) ShutdownReason

WaitForShutdown waits until Flamenco wants to shut down the application. Returns the reason of the shutdown.

type WorkerConfig

type WorkerConfig struct {
	WorkerName string `yaml:"worker_name"`

	// ConfiguredManager is the Manager URL that's in the configuration file.
	ConfiguredManager string `yaml:"manager_url"`

	// ManagerURL is the Manager URL to use by the Worker. It could come from the
	// configuration file, but also from autodiscovery via UPnP/SSDP.
	ManagerURL string `yaml:"-"`

	TaskTypes       []string `yaml:"task_types"`
	RestartExitCode int      `yaml:"restart_exit_code"`
}

WorkerConfig represents the configuration of a single worker. It does not include authentication credentials.

type WorkerConfigWithCredentials

type WorkerConfigWithCredentials interface {
	WorkerConfig() (WorkerConfig, error)
	WorkerCredentials() (WorkerCredentials, error)
	SaveCredentials(creds WorkerCredentials) error
}

type WorkerCredentials

type WorkerCredentials struct {
	WorkerID string `yaml:"worker_id"`
	Secret   string `yaml:"worker_secret"`
}

Directories

Path Synopsis
Package mocks is a generated GoMock package.
Package mocks is a generated GoMock package.
Package persistence provides the database interface for Flamenco Manager.
Package persistence provides the database interface for Flamenco Manager.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL