dispatcher

package
v0.18.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 19, 2024 License: MIT Imports: 8 Imported by: 0

Documentation

Index

Constants

View Source
const (
	TaskFailed = iota
	TaskDone
	TaskActive
	TaskPending
	TaskDropped
)

Variables

View Source
var (
	DispatcherQueueLength = prometheus.NewGauge(prometheus.GaugeOpts{
		Name: "dispatcher_queue_length",
	})
	DispatcherTasksActive = prometheus.NewGauge(prometheus.GaugeOpts{
		Name: "dispatcher_tasks_active",
	})
	DispatcherTasksQueued = prometheus.NewCounter(prometheus.CounterOpts{
		Name: "dispatcher_tasks_queued",
	})
	DispatcherTasksDropped = prometheus.NewCounter(prometheus.CounterOpts{
		Name: "dispatcher_tasks_dropped",
	})
	DispatcherTasksDone = prometheus.NewCounterVec(prometheus.CounterOpts{
		Name: "dispatcher_tasks_done",
	}, []string{"agent_id"})
	DispatcherTasksFailed = prometheus.NewCounterVec(prometheus.CounterOpts{
		Name: "dispatcher_tasks_failed",
	}, []string{"agent_id"})
)
View Source
var ErrInvalidPayload = errors.New("invalid payload")

Functions

func RegisterMetrics added in v0.10.11

func RegisterMetrics()

func SetLogger

func SetLogger(l *zap.SugaredLogger)

func WaitUntilTrue

func WaitUntilTrue(ctx context.Context, between time.Duration, f func() bool) error

Types

type Dispatcher

type Dispatcher struct {
	// contains filtered or unexported fields
}

func Start

func Start(parallel int, worker Worker, tasksBuffer int) Dispatcher

Start spawns a pool of workers. tasksBuffer sets how many tasks should be pre-emptively put into each worker's incoming queue. Set to 0 for prevent greedy tasks assignment (this will make `Dispatch` blocking).

func (*Dispatcher) Dispatch

func (d *Dispatcher) Dispatch(payload interface{}) *Result

Dispatch takes `payload`, wraps it into a `Task` and dispatches to the first available `Worker`.

func (Dispatcher) Stop

func (d Dispatcher) Stop()

type Result added in v0.7.0

type Result struct {
	Status int
	Error  error
	// contains filtered or unexported fields
}

Result is a result of Task execution. TODO: setting/returning this needs to be implemented better using channels.

func (Result) Done added in v0.7.0

func (r Result) Done() bool

func (Result) Failed added in v0.7.0

func (r Result) Failed() bool

func (Result) Value added in v0.14.0

func (r Result) Value() <-chan interface{}

type Task

type Task struct {
	Payload    interface{}
	Dispatcher *Dispatcher
	// contains filtered or unexported fields
}

Task represents a unit of work. Each worker should accept it as an argument. Example:

 func (w encoderWorker) Work(t dispatcher.Task) error {
		r := t.Payload.(*resolve.TranscodingRequest)
 ...

func (Task) SetResult added in v0.14.0

func (t Task) SetResult(v interface{})

type Worker

type Worker interface {
	Work(Task) error
}

Worker can be any object that is capable to do `Work()`.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL