worker2

package
v0.0.0-...-016edcc Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 14, 2024 License: MIT Imports: 22 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CollectRootErrors

func CollectRootErrors(err error) []error

func CollectUniqueErrors

func CollectUniqueErrors(inErrs []error) []error

func Wait

func Wait(ctx context.Context, f func())

func WaitChan

func WaitChan[T any](ctx context.Context, ch <-chan T) error

func WaitChanE

func WaitChanE[T error](ctx context.Context, ch <-chan T) error

func WaitDep

func WaitDep(ctx context.Context, dep Dep) error

func WaitE

func WaitE(ctx context.Context, f func() error) error

Types

type Action

type Action struct {
	// contains filtered or unexported fields
}

func NewAction

func NewAction(cfg ActionConfig) *Action

func (*Action) AddDep

func (a *Action) AddDep(deps ...Dep)

func (*Action) AddHook

func (a *Action) AddHook(hook Hook)

func (*Action) DeepDo

func (a *Action) DeepDo(f func(Dep))

func (*Action) ErrorCh

func (a *Action) ErrorCh() <-chan error

func (*Action) Exec

func (a *Action) Exec(ctx context.Context, ins InStore, outs OutStore) error

func (*Action) GetCtx

func (a *Action) GetCtx() context.Context

func (*Action) GetErr

func (a *Action) GetErr() error

func (*Action) GetExecutionDebugString

func (a *Action) GetExecutionDebugString() string

func (*Action) GetHooks

func (a *Action) GetHooks() []Hook

func (*Action) GetName

func (a *Action) GetName() string

func (*Action) GetNode

func (a *Action) GetNode() *dag.Node[Dep]

func (*Action) GetQueuedAt

func (a *Action) GetQueuedAt() time.Time

func (*Action) GetRequest

func (a *Action) GetRequest() map[string]float64

func (*Action) GetScheduledAt

func (a *Action) GetScheduledAt() time.Time

func (*Action) GetScheduler

func (a *Action) GetScheduler() Scheduler

func (*Action) GetStartedAt

func (a *Action) GetStartedAt() time.Time

func (*Action) GetState

func (a *Action) GetState() ExecState

func (*Action) OutputCh

func (a *Action) OutputCh() <-chan Value

func (*Action) SetCtx

func (a *Action) SetCtx(ctx context.Context)

func (*Action) Wait

func (a *Action) Wait() <-chan struct{}

type ActionConfig

type ActionConfig struct {
	Ctx       context.Context
	Name      string
	Deps      []Dep
	Hooks     []Hook
	Scheduler Scheduler
	Requests  map[string]float64
	Do        func(ctx context.Context, ins InStore, outs OutStore) error
}

type Dep

type Dep interface {
	GetName() string
	Exec(ctx context.Context, ins InStore, outs OutStore) error
	GetNode() *dag.Node[Dep]
	AddDep(...Dep)
	GetHooks() []Hook
	AddHook(h Hook)
	Wait() <-chan struct{}
	DeepDo(f func(Dep))
	GetCtx() context.Context
	SetCtx(ctx context.Context)
	GetErr() error
	GetState() ExecState
	GetScheduledAt() time.Time
	GetStartedAt() time.Time
	GetQueuedAt() time.Time

	GetScheduler() Scheduler
	GetRequest() map[string]float64
	GetExecutionDebugString() string
	// contains filtered or unexported methods
}

func NewChanDep

func NewChanDep[T any](ctx context.Context, ch chan T) Dep

func Serial

func Serial(deps []Dep) Dep

type Engine

type Engine struct {
	// contains filtered or unexported fields
}

func NewEngine

func NewEngine() *Engine

func (*Engine) GetLiveExecutions

func (e *Engine) GetLiveExecutions() []*Execution

func (*Engine) RegisterHook

func (e *Engine) RegisterHook(hook Hook)

func (*Engine) Run

func (e *Engine) Run()

func (*Engine) Schedule

func (e *Engine) Schedule(a Dep) Dep

func (*Engine) SetDefaultScheduler

func (e *Engine) SetDefaultScheduler(s Scheduler)

func (*Engine) Stop

func (e *Engine) Stop()

func (*Engine) Wait

func (e *Engine) Wait() <-chan struct{}

type Error

type Error struct {
	ID    uint64
	Name  string
	State ExecState
	Err   error
	// contains filtered or unexported fields
}

func (Error) Error

func (e Error) Error() string

func (*Error) Root

func (e *Error) Root() error

func (Error) Skipped

func (e Error) Skipped() bool

func (Error) Unwrap

func (e Error) Unwrap() error

type Event

type Event interface {
	Replayable() bool
}

type EventCompleted

type EventCompleted struct {
	At        time.Time
	Execution *Execution
	Output    Value
	Error     error
}

func (EventCompleted) Replayable

func (e EventCompleted) Replayable() bool

type EventDeclared

type EventDeclared struct {
	Dep Dep
}

func (EventDeclared) Replayable

func (EventDeclared) Replayable() bool

type EventNewDep

type EventNewDep struct {
	Execution *Execution
	Target    Dep
	AddedDep  Dep
}

func (EventNewDep) Replayable

func (e EventNewDep) Replayable() bool

type EventQueued

type EventQueued struct {
	At        time.Time
	Execution *Execution
}

func (EventQueued) Replayable

func (e EventQueued) Replayable() bool

type EventReady

type EventReady struct {
	At        time.Time
	Execution *Execution
}

func (EventReady) Replayable

func (e EventReady) Replayable() bool

type EventScheduled

type EventScheduled struct {
	At        time.Time
	Execution *Execution
}

func (EventScheduled) Replayable

func (e EventScheduled) Replayable() bool

type EventSkipped

type EventSkipped struct {
	At        time.Time
	Execution *Execution
	Error     error
}

func (EventSkipped) Replayable

func (e EventSkipped) Replayable() bool

type EventStarted

type EventStarted struct {
	At        time.Time
	Execution *Execution
}

func (EventStarted) Replayable

func (e EventStarted) Replayable() bool

type EventSuspended

type EventSuspended struct {
	At        time.Time
	Execution *Execution
	Bag       *SuspendBag
}

func (EventSuspended) Replayable

func (e EventSuspended) Replayable() bool

type EventWithExecution

type EventWithExecution interface {
	Event
	// contains filtered or unexported methods
}

type ExecState

type ExecState int
const (
	ExecStateUnknown ExecState = iota
	ExecStateScheduled
	ExecStateQueued
	ExecStateRunning
	ExecStateSucceeded
	ExecStateFailed
	ExecStateSkipped
	ExecStateSuspended
)

func (ExecState) IsFinal

func (s ExecState) IsFinal() bool

func (ExecState) String

func (s ExecState) String() string

type Execution

type Execution struct {
	ID    uint64
	Dep   Dep
	State ExecState
	Err   error

	ScheduledAt time.Time
	QueuedAt    time.Time
	StartedAt   time.Time
	CompletedAt time.Time
	// contains filtered or unexported fields
}

func (*Execution) GetOutput

func (e *Execution) GetOutput() Value

func (*Execution) GetStatus

func (e *Execution) GetStatus() status.Statuser

func (*Execution) Interactive

func (e *Execution) Interactive() bool

func (*Execution) ResumeAck

func (e *Execution) ResumeAck()

func (*Execution) Run

func (e *Execution) Run()

func (*Execution) Status

func (e *Execution) Status(status status.Statuser)

func (*Execution) String

func (e *Execution) String() string

func (*Execution) Suspend

func (e *Execution) Suspend() *SuspendBag

func (*Execution) Wait

func (e *Execution) Wait() <-chan struct{}

func (*Execution) WaitSuspend

func (e *Execution) WaitSuspend() <-chan *SuspendBag

type Group

type Group struct {
	// contains filtered or unexported fields
}

func NewGroup

func NewGroup(deps ...Dep) *Group

func NewGroupWith

func NewGroupWith(cfg GroupConfig) *Group

func NewNamedGroup

func NewNamedGroup(name string, deps ...Dep) *Group

func (*Group) AddDep

func (a *Group) AddDep(deps ...Dep)

func (*Group) AddHook

func (a *Group) AddHook(hook Hook)

func (*Group) DeepDo

func (g *Group) DeepDo(f func(Dep))

func (*Group) ErrorCh

func (a *Group) ErrorCh() <-chan error

func (*Group) Exec

func (g *Group) Exec(ctx context.Context, ins InStore, outs OutStore) error

func (*Group) GetCtx

func (g *Group) GetCtx() context.Context

func (*Group) GetErr

func (a *Group) GetErr() error

func (*Group) GetExecutionDebugString

func (a *Group) GetExecutionDebugString() string

func (*Group) GetHooks

func (a *Group) GetHooks() []Hook

func (*Group) GetName

func (g *Group) GetName() string

func (*Group) GetNode

func (a *Group) GetNode() *dag.Node[Dep]

func (*Group) GetQueuedAt

func (a *Group) GetQueuedAt() time.Time

func (*Group) GetRequest

func (g *Group) GetRequest() map[string]float64

func (*Group) GetScheduledAt

func (a *Group) GetScheduledAt() time.Time

func (*Group) GetScheduler

func (g *Group) GetScheduler() Scheduler

func (*Group) GetStartedAt

func (a *Group) GetStartedAt() time.Time

func (*Group) GetState

func (a *Group) GetState() ExecState

func (*Group) OutputCh

func (a *Group) OutputCh() <-chan Value

func (*Group) SetCtx

func (g *Group) SetCtx(ctx context.Context)

func (*Group) Wait

func (a *Group) Wait() <-chan struct{}

type GroupConfig

type GroupConfig struct {
	Name string
	Deps []Dep
}

type Hook

type Hook func(Event)

func ErrorHook

func ErrorHook() (Hook, <-chan error)

func LogHook

func LogHook() Hook

func OutputHook

func OutputHook() (Hook, <-chan Value)

type InStore

type InStore interface {
	Get(key string) any
}

type LimitScheduler

type LimitScheduler struct {
	// contains filtered or unexported fields
}

func NewLimitScheduler

func NewLimitScheduler(limit int) *LimitScheduler

func (*LimitScheduler) Done

func (ls *LimitScheduler) Done(d Dep)

func (*LimitScheduler) Schedule

func (ls *LimitScheduler) Schedule(d Dep, ins InStore) error

type MapValue

type MapValue map[string]Value

func (MapValue) Get

func (m MapValue) Get() (any, error)

func (MapValue) Set

func (m MapValue) Set(k string, v Value)

type MemValue

type MemValue[T any] struct {
	V T
}

func NewValue

func NewValue[T any](v T) MemValue[T]

func (MemValue[T]) Get

func (v MemValue[T]) Get() (any, error)

type Named

type Named struct {
	Name string
	Dep
}

type OutStore

type OutStore interface {
	Set(Value)
	Get() Value
}

type ResourceScheduler

type ResourceScheduler struct {
	// contains filtered or unexported fields
}

func NewResourceScheduler

func NewResourceScheduler(limits map[string]float64, def map[string]float64) *ResourceScheduler

func (*ResourceScheduler) Done

func (ls *ResourceScheduler) Done(d Dep)

func (*ResourceScheduler) Schedule

func (ls *ResourceScheduler) Schedule(d Dep, ins InStore) error

type RunningTracker

type RunningTracker struct {
	// contains filtered or unexported fields
}

func NewRunningTracker

func NewRunningTracker() *RunningTracker

func (*RunningTracker) Get

func (t *RunningTracker) Get() []*Execution

func (*RunningTracker) Group

func (t *RunningTracker) Group() *Group

func (*RunningTracker) Hook

func (t *RunningTracker) Hook() Hook

type Scheduler

type Scheduler interface {
	Schedule(Dep, InStore) error
	Done(Dep)
}

type Sem

type Sem struct {
	Dep
	// contains filtered or unexported fields
}

func NewSemDep

func NewSemDep(ctx context.Context, name string) *Sem

func (*Sem) AddSem

func (s *Sem) AddSem(delta int)

func (*Sem) DoneSem

func (s *Sem) DoneSem()

type StageHook

type StageHook struct {
	OnScheduled func(Dep) context.Context
	// OnWaiting
	OnQueued func(Dep) context.Context
	OnStart  func(Dep) context.Context
	OnEnd    func(Dep) context.Context
}

func (StageHook) Hook

func (h StageHook) Hook() Hook

type Stats

type Stats struct {
	All       uint64
	Completed uint64

	Scheduled uint64
	Waiting   uint64
	Succeeded uint64
	Failed    uint64
	Skipped   uint64
	Suspended uint64
	Running   uint64
}

func CollectStats

func CollectStats(dep Dep) Stats

CollectStats can get quite expensive on large DAGs, prefer NewStatsCollector

type StatsCollector

type StatsCollector struct {
	// contains filtered or unexported fields
}

func NewStatsCollector

func NewStatsCollector() *StatsCollector

func (*StatsCollector) Collect

func (c *StatsCollector) Collect() Stats

func (*StatsCollector) Register

func (c *StatsCollector) Register(dep Dep)

type SuspendBag

type SuspendBag struct {
	// contains filtered or unexported fields
}

func (*SuspendBag) Resume

func (e *SuspendBag) Resume() <-chan struct{}

func (*SuspendBag) WaitResume

func (e *SuspendBag) WaitResume() <-chan struct{}

type UnlimitedScheduler

type UnlimitedScheduler struct{}

func (UnlimitedScheduler) Done

func (ls UnlimitedScheduler) Done(d Dep)

func (UnlimitedScheduler) Schedule

func (ls UnlimitedScheduler) Schedule(d Dep, ins InStore) error

type Value

type Value interface {
	Get() (any, error)
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL