Documentation
¶
Overview ¶
Package sup provides actor supervision, reactive signals, and message inboxes for building concurrent systems.
Index ¶
- Variables
- type Actor
- type ActorRegisteredPayload
- type ActorRestartingPayload
- type ActorStartedPayload
- type ActorStoppedPayload
- type BaseActor
- type CallControl
- type CallInbox
- type CallRequest
- type CastControl
- type CastInbox
- func (i *CastInbox[T]) Cap() int
- func (i *CastInbox[T]) Cast(ctx context.Context, message T) error
- func (i *CastInbox[T]) Close()
- func (i *CastInbox[T]) Closed() bool
- func (i *CastInbox[T]) Len() int
- func (i *CastInbox[T]) Receive() <-chan T
- func (i *CastInbox[T]) TryCast(ctx context.Context, message T) error
- type CastRequest
- type Control
- type ControlKind
- type Controllable
- type Derived
- func (d *Derived[V]) BatchWindow(window time.Duration) *Derived[V]
- func (d *Derived[V]) Effect(id string, fn EffectFunc[V]) *Effect[V]
- func (d *Derived[V]) Equal(eq func(a, b V) bool) *Derived[V]
- func (d *Derived[V]) InitialNotify() *Derived[V]
- func (d *Derived[V]) Inspect() Spec
- func (b Derived) Read() V
- func (d *Derived[V]) Run(ctx context.Context) error
- func (b Derived) Subscribe(ctx context.Context) <-chan V
- func (b Derived) Watch(ctx context.Context) <-chan struct{}
- type Effect
- type EffectFunc
- type EmitFunc
- type Event
- type EventType
- type Processor
- type ProcessorFunc
- type ProcessorRuntime
- type ReadableSignal
- type ReaderSignal
- type RepliableRequest
- type Resettable
- type RestartPolicy
- type Signal
- func (s *Signal[V]) Buffer(size int) *Signal[V]
- func (s *Signal[V]) Debounce(wait time.Duration) *Signal[V]
- func (s *Signal[V]) Effect(id string, fn EffectFunc[V]) *Effect[V]
- func (s *Signal[V]) Equal(eq func(a, b V) bool) *Signal[V]
- func (s *Signal[V]) Filter(fn func(V) bool) *Signal[V]
- func (s *Signal[V]) InitialNotify() *Signal[V]
- func (s *Signal[V]) Inspect() Spec
- func (s *Signal[V]) Map(fn func(V) V) *Signal[V]
- func (s *Signal[V]) Poll(interval time.Duration, fn func(context.Context) (V, error)) *Signal[V]
- func (s *Signal[V]) Processor(processor Processor[V]) *Signal[V]
- func (b Signal) Read() V
- func (s *Signal[V]) Run(ctx context.Context) error
- func (s *Signal[V]) Source(source Source[V]) *Signal[V]
- func (b Signal) Subscribe(ctx context.Context) <-chan V
- func (s *Signal[V]) Throttle(interval time.Duration) *Signal[V]
- func (b Signal) Watch(ctx context.Context) <-chan struct{}
- func (s *Signal[V]) Write(ctx context.Context, value V) error
- type SignalUpdatedPayload
- type Source
- type SourceFunc
- type Spec
- type SubscriberSignal
- type Supervisor
- func (s *Supervisor) Actor(actor Actor) *Supervisor
- func (s *Supervisor) Actors(actors ...Actor) *Supervisor
- func (s *Supervisor) Children() []Actor
- func (s *Supervisor) Inspect() Spec
- func (s *Supervisor) Observer(observer *SupervisorObserver) *Supervisor
- func (s *Supervisor) OnError(handler func(actor Actor, err error)) *Supervisor
- func (s *Supervisor) Policy(policy RestartPolicy) *Supervisor
- func (s *Supervisor) RestartDelay(d time.Duration) *Supervisor
- func (s *Supervisor) RestartLimit(maxRestarts int, window time.Duration) *Supervisor
- func (s *Supervisor) Run(ctx context.Context) error
- func (s *Supervisor) Spawn(ctx context.Context, actor Actor)
- func (s *Supervisor) Wait()
- type SupervisorObserver
- type SupervisorTerminalPayload
- type WatcherSignal
- type WritableSignal
- type WriterSignal
Constants ¶
This section is empty.
Variables ¶
var ( // ErrCallInboxFull is returned when a non-blocking call cannot be queued. ErrCallInboxFull = errors.New("sup: call inbox is full") // ErrCallInboxClosed is returned when a call is made after the inbox is closed. ErrCallInboxClosed = errors.New("sup: call inbox is closed") )
var ( // ErrCastInboxFull is returned when a non-blocking cast cannot be queued. ErrCastInboxFull = errors.New("sup: cast inbox is full") // ErrCastInboxClosed is returned when a cast is made after the inbox is closed. ErrCastInboxClosed = errors.New("sup: cast inbox is closed") )
Functions ¶
This section is empty.
Types ¶
type Actor ¶ added in v0.0.13
type Actor interface {
// ID returns the actor id.
ID() string
// Run starts the actor and blocks until it stops or fails.
Run(context.Context) error
// Inspect returns structured metadata for the actor.
Inspect() Spec
}
Actor represents a concurrent entity that can be supervised.
The Run method should return an error if the actor needs to be restarted, or nil if it can exit cleanly. Panics will also trigger a restart.
type ActorRegisteredPayload ¶ added in v0.0.52
type ActorRegisteredPayload struct {
SupervisorID string `json:"supervisor_id"`
}
ActorRegisteredPayload is the payload for EventActorRegistered.
type ActorRestartingPayload ¶ added in v0.0.52
type ActorRestartingPayload struct {
SupervisorID string `json:"supervisor_id"`
RestartCount int `json:"restart_count"`
LastError string `json:"last_error,omitempty"`
}
ActorRestartingPayload is the payload for EventActorRestarting.
type ActorStartedPayload ¶ added in v0.0.52
type ActorStartedPayload struct {
SupervisorID string `json:"supervisor_id"`
}
ActorStartedPayload is the payload for EventActorStarted.
type ActorStoppedPayload ¶ added in v0.0.52
type ActorStoppedPayload struct {
SupervisorID string `json:"supervisor_id"`
Error string `json:"error,omitempty"`
}
ActorStoppedPayload is the payload for EventActorStopped.
type BaseActor ¶ added in v0.0.20
type BaseActor struct {
// contains filtered or unexported fields
}
BaseActor provides a reusable actor identity and default inspection metadata.
func NewBaseActor ¶ added in v0.0.20
NewBaseActor creates a base actor with the given id.
type CallControl ¶ added in v0.0.52
CallControl dispatches JSON input to a call inbox and returns the reply.
func NewCallControl ¶ added in v0.0.52
func NewCallControl[T any, R any](name string, inbox *CallInbox[T, R]) CallControl
NewCallControl creates a call control for the given inbox.
type CallInbox ¶ added in v0.0.37
CallInbox queues synchronous requests and delivers replies to callers.
func NewCallInbox ¶ added in v0.0.37
NewCallInbox creates a call inbox with the given buffer size.
func (*CallInbox[T, R]) Call ¶ added in v0.0.37
Call sends a request and waits for a reply or context cancellation.
func (*CallInbox[T, R]) Close ¶ added in v0.0.37
func (i *CallInbox[T, R]) Close()
Close closes the inbox and prevents future calls.
func (*CallInbox[T, R]) Len ¶ added in v0.0.37
Len returns the number of messages currently in the inbox.
func (*CallInbox[T, R]) Receive ¶ added in v0.0.37
func (i *CallInbox[T, R]) Receive() <-chan CallRequest[T, R]
Receive returns the read-only request channel for the actor's internal loop.
type CallRequest ¶ added in v0.0.8
CallRequest wraps a payload with a reply channel for synchronous calls.
func (CallRequest[T, R]) Payload ¶ added in v0.0.8
func (r CallRequest[T, R]) Payload() T
Payload returns the request's payload.
func (CallRequest[T, R]) Reply ¶ added in v0.0.8
func (r CallRequest[T, R]) Reply(value R, err error)
Reply sends the response back to the caller. The actor should call this exactly once per request.
type CastControl ¶ added in v0.0.52
CastControl dispatches JSON input to a cast inbox.
func NewCastControl ¶ added in v0.0.52
func NewCastControl[T any](name string, inbox *CastInbox[T]) CastControl
NewCastControl creates a cast control for the given inbox.
type CastInbox ¶ added in v0.0.37
type CastInbox[T any] struct { // contains filtered or unexported fields }
CastInbox queues asynchronous messages for an actor.
func NewCastInbox ¶ added in v0.0.37
NewCastInbox creates a cast inbox with the given buffer size.
func (*CastInbox[T]) Cast ¶ added in v0.0.37
Cast sends a message, blocking until it is queued or the context is canceled.
func (*CastInbox[T]) Close ¶ added in v0.0.37
func (i *CastInbox[T]) Close()
Close closes the inbox and prevents future casts.
func (*CastInbox[T]) Len ¶ added in v0.0.37
Len returns the number of messages currently in the inbox.
type CastRequest ¶ added in v0.0.8
type CastRequest[T any] struct { // contains filtered or unexported fields }
CastRequest wraps a payload for asynchronous calls without expecting a reply.
func (CastRequest[T]) Payload ¶ added in v0.0.8
func (r CastRequest[T]) Payload() T
Payload returns the request's payload.
type Control ¶ added in v0.0.52
type Control interface {
Name() string
Kind() ControlKind
InputSchema() map[string]any
}
Control describes an actor command that can be dispatched dynamically.
type ControlKind ¶ added in v0.0.52
type ControlKind string
ControlKind identifies how a control dispatches input to an actor.
const ( // ControlKindCall identifies a control that returns a reply. ControlKindCall ControlKind = "call" // ControlKindCast identifies a control that does not return a reply. ControlKindCast ControlKind = "cast" )
type Controllable ¶ added in v0.0.52
type Controllable interface {
Controls() []Control
}
Controllable represents an actor that exposes dispatchable controls.
type Derived ¶ added in v0.0.52
type Derived[V any] struct { // contains filtered or unexported fields }
Derived is a read-only signal computed from one or more watcher signals.
func NewDerived ¶ added in v0.0.52
func NewDerived[V any](id string, compute func() V, deps ...WatcherSignal) *Derived[V]
NewDerived creates a derived signal with the initial computed value.
func (*Derived[V]) BatchWindow ¶ added in v0.0.52
BatchWindow sets the coalescing window for dependency updates.
func (*Derived[V]) Effect ¶ added in v0.0.52
func (d *Derived[V]) Effect( id string, fn EffectFunc[V], ) *Effect[V]
Effect creates an effect that reacts to this derived signal.
func (*Derived[V]) Equal ¶ added in v0.0.52
Equal sets the equality function used to suppress unchanged values.
func (*Derived[V]) InitialNotify ¶ added in v0.0.52
InitialNotify makes new subscribers receive the current value immediately.
func (Derived) Read ¶ added in v0.0.52
func (b Derived) Read() V
Read returns the current signal value.
func (*Derived[V]) Run ¶ added in v0.0.52
Run watches dependencies and recomputes the value until the context is canceled.
type Effect ¶ added in v0.0.52
Effect is an actor that reacts to values from a ReadableSignal.
func NewEffect ¶ added in v0.0.52
func NewEffect[V any]( id string, signal ReadableSignal[V], fn EffectFunc[V], ) *Effect[V]
NewEffect creates an effect that reacts to values from signal.
type EffectFunc ¶ added in v0.0.52
EffectFunc handles a value emitted by a signal.
type Event ¶ added in v0.0.52
type Event struct {
ID string `json:"id"`
Timestamp int64 `json:"timestamp"`
Type EventType `json:"type"`
SourceID string `json:"source_id"`
Payload any `json:"payload,omitempty"`
}
Event describes an occurrence emitted by an actor, supervisor, or signal.
type EventType ¶ added in v0.0.52
type EventType string
EventType identifies the kind of event that occurred.
const ( // EventActorRegistered is emitted when an actor is registered with a supervisor. EventActorRegistered EventType = "actor:registered" // EventActorStarted is emitted when an actor starts running. EventActorStarted EventType = "actor:started" // EventActorStopped is emitted when an actor stops running. EventActorStopped EventType = "actor:stopped" // EventActorRestarting is emitted before a supervisor restarts an actor. EventActorRestarting EventType = "actor:restarting" // EventSupervisorTerminal is emitted when a supervisor reaches a terminal error. EventSupervisorTerminal EventType = "supervisor:terminal" // EventSignalUpdated is emitted when a signal value changes. EventSignalUpdated EventType = "signal:updated" )
type Processor ¶ added in v0.0.52
type Processor[V any] interface { Process(ctx context.Context, value V, runtime ProcessorRuntime[V]) error OnTimer(ctx context.Context, timerID uint64, runtime ProcessorRuntime[V]) error }
Processor transforms, filters, or delays values written to a signal.
func Debounce ¶ added in v0.0.52
Debounce returns a processor that emits the latest value after wait has elapsed.
type ProcessorFunc ¶ added in v0.0.52
type ProcessorFunc[V any] func(context.Context, V, ProcessorRuntime[V]) error
ProcessorFunc adapts a function into a Processor.
func (ProcessorFunc[V]) OnTimer ¶ added in v0.0.52
func (f ProcessorFunc[V]) OnTimer( ctx context.Context, timerID uint64, runtime ProcessorRuntime[V], ) error
OnTimer ignores timer callbacks for function processors.
func (ProcessorFunc[V]) Process ¶ added in v0.0.52
func (f ProcessorFunc[V]) Process( ctx context.Context, value V, runtime ProcessorRuntime[V], ) error
Process calls f with the incoming value and runtime.
type ProcessorRuntime ¶ added in v0.0.52
ProcessorRuntime lets processors emit values or schedule timers.
type ReadableSignal ¶ added in v0.0.40
type ReadableSignal[V any] interface { ReaderSignal[V] SubscriberSignal[V] WatcherSignal }
ReadableSignal is a signal that can be read, watched, and subscribed to.
type ReaderSignal ¶ added in v0.0.40
ReaderSignal is a signal that exposes its current value.
type RepliableRequest ¶ added in v0.0.9
RepliableRequest represents a request that can be replied to.
type Resettable ¶ added in v0.0.52
type Resettable interface {
Reset()
}
Resettable represents a processor that can reset its internal state.
type RestartPolicy ¶
type RestartPolicy uint8
RestartPolicy controls when a supervisor restarts a stopped actor.
const ( // Permanent always restarts actors, even after clean exits. Permanent RestartPolicy = iota // Transient restarts actors after errors or panics, but not after clean exits. Transient // Temporary never restarts actors. Temporary )
type Signal ¶ added in v0.0.43
type Signal[V any] struct { // contains filtered or unexported fields }
Signal stores a value and publishes updates to subscribers and watchers.
func (*Signal[V]) Debounce ¶ added in v0.0.52
Debounce adds a processor that emits the latest value after wait has elapsed.
func (*Signal[V]) Effect ¶ added in v0.0.52
func (s *Signal[V]) Effect( id string, fn EffectFunc[V], ) *Effect[V]
Effect creates an effect that reacts to this signal.
func (*Signal[V]) Equal ¶ added in v0.0.52
Equal sets the equality function used to suppress unchanged values.
func (*Signal[V]) Filter ¶ added in v0.0.52
Filter adds a processor that emits only values accepted by fn.
func (*Signal[V]) InitialNotify ¶ added in v0.0.52
InitialNotify makes new subscribers receive the current value immediately.
func (Signal) Read ¶ added in v0.0.52
func (b Signal) Read() V
Read returns the current signal value.
func (*Signal[V]) Run ¶ added in v0.0.52
Run starts the signal sources and blocks until they stop, fail, or the context is canceled.
func (*Signal[V]) Source ¶ added in v0.0.52
Source adds a source that writes values to the signal when it runs.
func (Signal) Subscribe ¶ added in v0.0.52
Subscribe returns a channel that receives changed signal values.
func (*Signal[V]) Throttle ¶ added in v0.0.52
Throttle adds a processor that emits at most one value per interval.
type SignalUpdatedPayload ¶ added in v0.0.52
type SignalUpdatedPayload struct {
Value any `json:"value"`
}
SignalUpdatedPayload is the payload for EventSignalUpdated.
type Source ¶ added in v0.0.52
Source produces values for a signal.
func FromChannel ¶ added in v0.0.52
FromChannel returns a source that emits values received from ch.
type SourceFunc ¶ added in v0.0.52
SourceFunc adapts a function into a Source.
type Spec ¶ added in v0.0.43
type Spec struct {
Kind string `json:"kind"`
Dependencies []string `json:"dependencies"`
Metadata map[string]string `json:"metadata"`
}
Spec represents a structured description of an inspectable component. It captures the component's kind, its dependency names, and arbitrary configuration metadata for visualization and debugging purposes.
type SubscriberSignal ¶ added in v0.0.40
type SubscriberSignal[V any] interface { Actor // Subscribe returns a channel that receives changed signal values. Subscribe(context.Context) <-chan V }
SubscriberSignal is a signal that publishes changed values to subscribers.
type Supervisor ¶
type Supervisor struct {
*BaseActor
// contains filtered or unexported fields
}
Supervisor runs actors and restarts them according to its restart policy.
func NewSupervisor ¶ added in v0.0.7
func NewSupervisor(id string) *Supervisor
NewSupervisor creates a supervisor with the given id.
func (*Supervisor) Actor ¶ added in v0.0.52
func (s *Supervisor) Actor(actor Actor) *Supervisor
Actor adds an actor to be started when the supervisor runs.
func (*Supervisor) Actors ¶ added in v0.0.52
func (s *Supervisor) Actors(actors ...Actor) *Supervisor
Actors adds multiple actors to be started when the supervisor runs.
func (*Supervisor) Children ¶ added in v0.0.43
func (s *Supervisor) Children() []Actor
Children returns a copy of the configured child actors.
func (*Supervisor) Inspect ¶ added in v0.0.43
func (s *Supervisor) Inspect() Spec
Inspect returns the supervisor spec.
func (*Supervisor) Observer ¶ added in v0.0.52
func (s *Supervisor) Observer(observer *SupervisorObserver) *Supervisor
Observer adds a lifecycle observer to the supervisor.
func (*Supervisor) OnError ¶
func (s *Supervisor) OnError(handler func(actor Actor, err error)) *Supervisor
OnError sets a handler called when an actor exits with an error or panic.
func (*Supervisor) Policy ¶
func (s *Supervisor) Policy(policy RestartPolicy) *Supervisor
Policy sets the supervisor restart policy.
func (*Supervisor) RestartDelay ¶
func (s *Supervisor) RestartDelay(d time.Duration) *Supervisor
RestartDelay sets the delay between actor restart attempts.
func (*Supervisor) RestartLimit ¶ added in v0.0.52
func (s *Supervisor) RestartLimit(maxRestarts int, window time.Duration) *Supervisor
RestartLimit sets the maximum restarts allowed within a time window.
func (*Supervisor) Run ¶ added in v0.0.14
func (s *Supervisor) Run(ctx context.Context) error
Run starts configured actors and blocks until they stop, fail, or the context is canceled.
func (*Supervisor) Spawn ¶ added in v0.0.14
func (s *Supervisor) Spawn(ctx context.Context, actor Actor)
Spawn starts and supervises an actor immediately.
func (*Supervisor) Wait ¶
func (s *Supervisor) Wait()
Wait blocks until all spawned actors have stopped.
type SupervisorObserver ¶ added in v0.0.28
type SupervisorObserver struct {
OnActorRegistered func(supervisor *Supervisor, actor Actor)
OnActorStarted func(supervisor *Supervisor, actor Actor)
OnActorStopped func(supervisor *Supervisor, actor Actor, err error)
OnActorRestarting func(supervisor *Supervisor, actor Actor, restartCount int, lastErr error)
OnSupervisorTerminal func(supervisor *Supervisor, err error)
}
SupervisorObserver receives lifecycle callbacks from a supervisor.
type SupervisorTerminalPayload ¶ added in v0.0.52
type SupervisorTerminalPayload struct {
Error string `json:"error,omitempty"`
}
SupervisorTerminalPayload is the payload for EventSupervisorTerminal.
type WatcherSignal ¶ added in v0.0.40
type WatcherSignal interface {
Actor
// Watch returns a channel that receives a notification when the signal changes.
Watch(ctx context.Context) <-chan struct{}
}
WatcherSignal is a signal that notifies watchers when its value changes.
type WritableSignal ¶ added in v0.0.40
type WritableSignal[V any] interface { ReadableSignal[V] WriterSignal[V] }
WritableSignal is a readable signal that can also be written to.