sup

package module
v0.0.52 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 31, 2026 License: MIT Imports: 15 Imported by: 0

README

sup

Go Reference Test License

sup is a small actor supervision and reactive signal toolkit for Go.

It provides typed inboxes for actor communication, OTP-style supervision with restart policies, reactive values that can be composed and observed, and an optional HTTP hub for inspecting actors, controls, signals, and events.

Features

  • Idiomatic actors — An actor is any value that implements ID(), Run(context.Context) error, and Inspect() Spec.
  • Supervisor trees — Supervisors are actors too, so they can supervise actors or other supervisors.
  • Restart policiesPermanent, Transient, and Temporary policies control when actors restart.
  • Panic recovery — Panics are recovered, wrapped with a stack trace, reported, and handled by the restart policy.
  • Typed inboxesCastInbox[T] and CallInbox[T, R] provide type-safe asynchronous and request/reply messaging.
  • Reactive signalsSignal, Derived, and Effect model readable values, computed values, and side effects.
  • Signal processors — Built-in Map, Filter, Debounce, and Throttle processors transform or rate-limit signal updates.
  • Runtime inspectionSpec, controls, supervisor observers, and hub expose useful metadata for debugging and dashboards.

Installation

go get github.com/webermarci/sup

Quick start

This example defines a counter actor with a fire-and-forget increment inbox and a request/reply get inbox.

package main

import (
	"context"
	"fmt"
	"os"
	"os/signal"
	"syscall"
	"time"

	"github.com/webermarci/sup"
)

type GetMessage struct{}

type IncrementMessage struct {
	Amount int
}

type Counter struct {
	*sup.BaseActor
	GetInbox       *sup.CallInbox[GetMessage, int]
	IncrementInbox *sup.CastInbox[IncrementMessage]
	State          int
}

func NewCounter(id string) *Counter {
	return &Counter{
		BaseActor:      sup.NewBaseActor(id),
		GetInbox:       sup.NewCallInbox[GetMessage, int](8),
		IncrementInbox: sup.NewCastInbox[IncrementMessage](8),
	}
}

func (c *Counter) Get(ctx context.Context) (int, error) {
	return c.GetInbox.Call(ctx, GetMessage{})
}

func (c *Counter) Increment(ctx context.Context, amount int) error {
	return c.IncrementInbox.Cast(ctx, IncrementMessage{Amount: amount})
}

func (c *Counter) Run(ctx context.Context) error {
	for {
		select {
		case <-ctx.Done():
			return nil

		case req := <-c.GetInbox.Receive():
			req.Reply(c.State, nil)

		case msg := <-c.IncrementInbox.Receive():
			c.State += msg.Amount
		}
	}
}

func main() {
	ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
	defer cancel()

	counter := NewCounter("counter")

	supervisor := sup.NewSupervisor("root").
		Policy(sup.Permanent).
		RestartDelay(time.Second).
		RestartLimit(5, 10*time.Second).
		OnError(func(actor sup.Actor, err error) {
			fmt.Printf("actor %s failed: %v\n", actor.ID(), err)
		}).
		Actor(counter)

	go func() {
		if err := supervisor.Run(ctx); err != nil && ctx.Err() == nil {
			fmt.Println("supervisor stopped:", err)
		}
	}()

	_ = counter.Increment(ctx, 10)
	_ = counter.Increment(ctx, 32)

	count, err := counter.Get(ctx)
	if err != nil {
		panic(err)
	}

	fmt.Println("count:", count)

	cancel()
	supervisor.Wait()
}

Actors

An actor implements the Actor interface:

type Actor interface {
	ID() string
	Run(context.Context) error
	Inspect() sup.Spec
}

Embed *sup.BaseActor when you only need a stable id and a default Inspect implementation:

type Worker struct {
	*sup.BaseActor
}

func NewWorker(id string) *Worker {
	return &Worker{BaseActor: sup.NewBaseActor(id)}
}

For stateless actors, use ActorFunc:

worker := sup.ActorFunc("health", func(ctx context.Context) error {
	<-ctx.Done()
	return nil
})

Supervisors

Supervisors run actors and restart them according to a restart policy.

supervisor := sup.NewSupervisor("root").
	Policy(sup.Transient).
	RestartDelay(500 * time.Millisecond).
	RestartLimit(3, time.Minute).
	Actors(actorA, actorB)

if err := supervisor.Run(ctx); err != nil {
	// Run returns ctx.Err() when ctx is canceled, or a terminal supervisor error.
}
Restart policies
Policy Clean exit (nil) Error or panic
Permanent restart restart
Transient stop restart
Temporary stop stop
Dynamic spawning

Use Spawn to start an actor after the supervisor already exists:

supervisor := sup.NewSupervisor("jobs").Policy(sup.Temporary)
go supervisor.Run(ctx)

for _, job := range jobs {
	supervisor.Spawn(ctx, newJobActor(job))
}

supervisor.Wait()
Observers

SupervisorObserver receives asynchronous lifecycle callbacks. Parent supervisor observers are inherited by child supervisors.

observer := &sup.SupervisorObserver{
	OnActorRegistered: func(s *sup.Supervisor, a sup.Actor) {
		fmt.Println("registered", a.ID())
	},
	OnActorStarted: func(s *sup.Supervisor, a sup.Actor) {
		fmt.Println("started", a.ID())
	},
	OnActorStopped: func(s *sup.Supervisor, a sup.Actor, err error) {
		fmt.Println("stopped", a.ID(), err)
	},
	OnActorRestarting: func(s *sup.Supervisor, a sup.Actor, count int, lastErr error) {
		fmt.Println("restarting", a.ID(), count, lastErr)
	},
	OnSupervisorTerminal: func(s *sup.Supervisor, err error) {
		fmt.Println("terminal", s.ID(), err)
	},
}

root := sup.NewSupervisor("root").Observer(observer).Actor(worker)

Typed inboxes

CastInbox[T] is for asynchronous messages. CallInbox[T, R] is for request/reply interactions.

Cast inbox
inbox := sup.NewCastInbox[IncrementMessage](8)

err := inbox.Cast(ctx, IncrementMessage{Amount: 1})    // blocks until queued or ctx is done
err = inbox.TryCast(ctx, IncrementMessage{Amount: 1}) // returns ErrCastInboxFull if full

for msg := range inbox.Receive() {
	// process msg
}
Call inbox
inbox := sup.NewCallInbox[GetMessage, int](8)

value, err := inbox.Call(ctx, GetMessage{})

for req := range inbox.Receive() {
	req.Reply(42, nil)
}

Both inboxes expose Close, Closed, Len, and Cap.

Signal

A Signal[V] stores a value, publishes updates, and can run one or more sources.

count := sup.NewSignal("count", 0).
	InitialNotify().
	Equal(func(a, b int) bool { return a == b })

values := count.Subscribe(ctx)
updates := count.Watch(ctx)

go func() {
	for value := range values {
		fmt.Println(value)
	}
}()

_ = count.Write(ctx, 1)

Signals are actors. If a signal has sources, run it under a supervisor or in a goroutine:

random := sup.NewSignal("random", 0).
	Poll(200*time.Millisecond, func(ctx context.Context) (int, error) {
		return rand.IntN(100), nil
	}).
	Throttle(time.Second)

root := sup.NewSupervisor("signals").Actor(random)
go root.Run(ctx)
Sources

A source produces values for a signal:

  • Poll(interval, fn) calls fn on each interval and emits the result.
  • FromChannel(ch) emits values received from a channel.
  • SourceFunc adapts a function into a custom source.
ch := make(chan string)
status := sup.NewSignal("status", "offline").Source(sup.FromChannel(ch))
Processors

Processors transform, filter, or delay values before they are stored and broadcast:

processed := sup.NewSignal("processed", 0).
	Map(func(v int) int { return v * 2 }).
	Filter(func(v int) bool { return v >= 10 }).
	Debounce(100 * time.Millisecond)

Built-in processors:

  • Map(fn) transforms each value.
  • Filter(fn) drops values that do not match.
  • Debounce(wait) emits the latest value after a quiet period.
  • Throttle(interval) emits at most one value per interval.
Derived

Derived[V] computes a read-only signal from one or more watcher signals.

count := sup.NewSignal("count", 0)
doubled := sup.NewDerived("doubled", func() int {
	return count.Read() * 2
}, count).
	InitialNotify()

root := sup.NewSupervisor("root").Actors(count, doubled)
go root.Run(ctx)

BatchWindow controls how dependency updates are coalesced before recomputing.

Effect

Effect[V] runs side effects for values emitted by a signal or derived signal.

effect := count.Effect("log_count", func(ctx context.Context, value int) error {
	fmt.Println("count changed:", value)
	return nil
})

root := sup.NewSupervisor("root").Actors(count, effect)

Controls

Controls expose typed inboxes for dynamic dispatch, such as from the hub HTTP API.

func (c *Counter) Controls() []sup.Control {
	return []sup.Control{
		sup.NewCastControl("increment", c.IncrementInbox),
		sup.NewCallControl("get", c.GetInbox),
	}
}

NewCastControl[T] decodes JSON input and dispatches to a CastInbox[T]. NewCallControl[T, R] decodes JSON input, dispatches to a CallInbox[T, R], and returns the reply.

Input schemas are inferred from Go types and JSON tags.

Hub

The github.com/webermarci/sup/hub package exposes actors, controls, signals, and events over HTTP. It also serves the embedded debug UI at /debug.

registry := hub.New("registry",
	hub.WithActor(counter),
	hub.WithSignal(counter.StateSignal),
)

root := sup.NewSupervisor("root").
	Observer(registry.Observer()).
	Actors(counter, counter.StateSignal, registry)

go root.Run(ctx)
go http.ListenAndServe(":8080", registry.Handler())

Endpoints include:

  • GET /actors
  • GET /actors/{actorID}
  • GET /actors/{actorID}/controls
  • POST /actors/{actorID}/controls/{controlName}
  • GET /signals
  • GET /signals/{signalID}
  • GET /events
  • GET /events/stream
  • GET /debug

Packages

  • sup — Core actors, supervisors, typed inboxes, controls, signals, sources, processors, derived signals, and effects.
  • sup/hub — HTTP API and debug UI for actors, controls, signals, and events.
  • sup/exec — Actor wrapper around os/exec commands.
  • sup/mesh — NATS-backed actor for subscriptions.
  • sup/modbus — Modbus actor for TCP/RTU/ASCII clients.
  • sup/mqtt — MQTT actor for publish/subscribe clients.
  • sup/sse — Server-Sent Events client actor.
  • sup/ws — WebSocket client actor.

Documentation

Overview

Package sup provides actor supervision, reactive signals, and message inboxes for building concurrent systems.

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrCallInboxFull is returned when a non-blocking call cannot be queued.
	ErrCallInboxFull = errors.New("sup: call inbox is full")

	// ErrCallInboxClosed is returned when a call is made after the inbox is closed.
	ErrCallInboxClosed = errors.New("sup: call inbox is closed")
)
View Source
var (
	// ErrCastInboxFull is returned when a non-blocking cast cannot be queued.
	ErrCastInboxFull = errors.New("sup: cast inbox is full")

	// ErrCastInboxClosed is returned when a cast is made after the inbox is closed.
	ErrCastInboxClosed = errors.New("sup: cast inbox is closed")
)

Functions

This section is empty.

Types

type Actor added in v0.0.13

type Actor interface {
	// ID returns the actor id.
	ID() string

	// Run starts the actor and blocks until it stops or fails.
	Run(context.Context) error

	// Inspect returns structured metadata for the actor.
	Inspect() Spec
}

Actor represents a concurrent entity that can be supervised.

The Run method should return an error if the actor needs to be restarted, or nil if it can exit cleanly. Panics will also trigger a restart.

func ActorFunc added in v0.0.13

func ActorFunc(id string, fn func(ctx context.Context) error) Actor

ActorFunc creates a simple stateless actor from a function.

type ActorRegisteredPayload added in v0.0.52

type ActorRegisteredPayload struct {
	SupervisorID string `json:"supervisor_id"`
}

ActorRegisteredPayload is the payload for EventActorRegistered.

type ActorRestartingPayload added in v0.0.52

type ActorRestartingPayload struct {
	SupervisorID string `json:"supervisor_id"`
	RestartCount int    `json:"restart_count"`
	LastError    string `json:"last_error,omitempty"`
}

ActorRestartingPayload is the payload for EventActorRestarting.

type ActorStartedPayload added in v0.0.52

type ActorStartedPayload struct {
	SupervisorID string `json:"supervisor_id"`
}

ActorStartedPayload is the payload for EventActorStarted.

type ActorStoppedPayload added in v0.0.52

type ActorStoppedPayload struct {
	SupervisorID string `json:"supervisor_id"`
	Error        string `json:"error,omitempty"`
}

ActorStoppedPayload is the payload for EventActorStopped.

type BaseActor added in v0.0.20

type BaseActor struct {
	// contains filtered or unexported fields
}

BaseActor provides a reusable actor identity and default inspection metadata.

func NewBaseActor added in v0.0.20

func NewBaseActor(id string) *BaseActor

NewBaseActor creates a base actor with the given id.

func (*BaseActor) ID added in v0.0.45

func (a *BaseActor) ID() string

ID returns the actor id.

func (*BaseActor) Inspect added in v0.0.43

func (a *BaseActor) Inspect() Spec

Inspect returns the default actor spec.

type CallControl added in v0.0.52

type CallControl interface {
	Control
	Dispatch(context.Context, json.RawMessage) (any, error)
}

CallControl dispatches JSON input to a call inbox and returns the reply.

func NewCallControl added in v0.0.52

func NewCallControl[T any, R any](name string, inbox *CallInbox[T, R]) CallControl

NewCallControl creates a call control for the given inbox.

type CallInbox added in v0.0.37

type CallInbox[T any, R any] struct {
	// contains filtered or unexported fields
}

CallInbox queues synchronous requests and delivers replies to callers.

func NewCallInbox added in v0.0.37

func NewCallInbox[T any, R any](size int) *CallInbox[T, R]

NewCallInbox creates a call inbox with the given buffer size.

func (*CallInbox[T, R]) Call added in v0.0.37

func (i *CallInbox[T, R]) Call(ctx context.Context, message T) (R, error)

Call sends a request and waits for a reply or context cancellation.

func (*CallInbox[T, R]) Cap added in v0.0.37

func (i *CallInbox[T, R]) Cap() int

Cap returns the capacity of the inbox buffer.

func (*CallInbox[T, R]) Close added in v0.0.37

func (i *CallInbox[T, R]) Close()

Close closes the inbox and prevents future calls.

func (*CallInbox[T, R]) Closed added in v0.0.52

func (i *CallInbox[T, R]) Closed() bool

Closed reports whether the inbox is closed.

func (*CallInbox[T, R]) Len added in v0.0.37

func (i *CallInbox[T, R]) Len() int

Len returns the number of messages currently in the inbox.

func (*CallInbox[T, R]) Receive added in v0.0.37

func (i *CallInbox[T, R]) Receive() <-chan CallRequest[T, R]

Receive returns the read-only request channel for the actor's internal loop.

type CallRequest added in v0.0.8

type CallRequest[T any, R any] struct {
	// contains filtered or unexported fields
}

CallRequest wraps a payload with a reply channel for synchronous calls.

func (CallRequest[T, R]) Payload added in v0.0.8

func (r CallRequest[T, R]) Payload() T

Payload returns the request's payload.

func (CallRequest[T, R]) Reply added in v0.0.8

func (r CallRequest[T, R]) Reply(value R, err error)

Reply sends the response back to the caller. The actor should call this exactly once per request.

type CastControl added in v0.0.52

type CastControl interface {
	Control
	Dispatch(context.Context, json.RawMessage) error
}

CastControl dispatches JSON input to a cast inbox.

func NewCastControl added in v0.0.52

func NewCastControl[T any](name string, inbox *CastInbox[T]) CastControl

NewCastControl creates a cast control for the given inbox.

type CastInbox added in v0.0.37

type CastInbox[T any] struct {
	// contains filtered or unexported fields
}

CastInbox queues asynchronous messages for an actor.

func NewCastInbox added in v0.0.37

func NewCastInbox[T any](size int) *CastInbox[T]

NewCastInbox creates a cast inbox with the given buffer size.

func (*CastInbox[T]) Cap added in v0.0.37

func (i *CastInbox[T]) Cap() int

Cap returns the capacity of the inbox buffer.

func (*CastInbox[T]) Cast added in v0.0.37

func (i *CastInbox[T]) Cast(ctx context.Context, message T) error

Cast sends a message, blocking until it is queued or the context is canceled.

func (*CastInbox[T]) Close added in v0.0.37

func (i *CastInbox[T]) Close()

Close closes the inbox and prevents future casts.

func (*CastInbox[T]) Closed added in v0.0.52

func (i *CastInbox[T]) Closed() bool

Closed reports whether the inbox is closed.

func (*CastInbox[T]) Len added in v0.0.37

func (i *CastInbox[T]) Len() int

Len returns the number of messages currently in the inbox.

func (*CastInbox[T]) Receive added in v0.0.37

func (i *CastInbox[T]) Receive() <-chan T

Receive returns the read-only channel for the actor's internal loop.

func (*CastInbox[T]) TryCast added in v0.0.37

func (i *CastInbox[T]) TryCast(ctx context.Context, message T) error

TryCast sends a message without blocking when the inbox is full.

type CastRequest added in v0.0.8

type CastRequest[T any] struct {
	// contains filtered or unexported fields
}

CastRequest wraps a payload for asynchronous calls without expecting a reply.

func (CastRequest[T]) Payload added in v0.0.8

func (r CastRequest[T]) Payload() T

Payload returns the request's payload.

type Control added in v0.0.52

type Control interface {
	Name() string
	Kind() ControlKind
	InputSchema() map[string]any
}

Control describes an actor command that can be dispatched dynamically.

type ControlKind added in v0.0.52

type ControlKind string

ControlKind identifies how a control dispatches input to an actor.

const (
	// ControlKindCall identifies a control that returns a reply.
	ControlKindCall ControlKind = "call"

	// ControlKindCast identifies a control that does not return a reply.
	ControlKindCast ControlKind = "cast"
)

type Controllable added in v0.0.52

type Controllable interface {
	Controls() []Control
}

Controllable represents an actor that exposes dispatchable controls.

type Derived added in v0.0.52

type Derived[V any] struct {
	// contains filtered or unexported fields
}

Derived is a read-only signal computed from one or more watcher signals.

func NewDerived added in v0.0.52

func NewDerived[V any](id string, compute func() V, deps ...WatcherSignal) *Derived[V]

NewDerived creates a derived signal with the initial computed value.

func (*Derived[V]) BatchWindow added in v0.0.52

func (d *Derived[V]) BatchWindow(window time.Duration) *Derived[V]

BatchWindow sets the coalescing window for dependency updates.

func (*Derived[V]) Effect added in v0.0.52

func (d *Derived[V]) Effect(
	id string,
	fn EffectFunc[V],
) *Effect[V]

Effect creates an effect that reacts to this derived signal.

func (*Derived[V]) Equal added in v0.0.52

func (d *Derived[V]) Equal(eq func(a, b V) bool) *Derived[V]

Equal sets the equality function used to suppress unchanged values.

func (*Derived[V]) InitialNotify added in v0.0.52

func (d *Derived[V]) InitialNotify() *Derived[V]

InitialNotify makes new subscribers receive the current value immediately.

func (*Derived[V]) Inspect added in v0.0.52

func (d *Derived[V]) Inspect() Spec

Inspect returns the derived signal spec.

func (Derived) Read added in v0.0.52

func (b Derived) Read() V

Read returns the current signal value.

func (*Derived[V]) Run added in v0.0.52

func (d *Derived[V]) Run(ctx context.Context) error

Run watches dependencies and recomputes the value until the context is canceled.

func (Derived) Subscribe added in v0.0.52

func (b Derived) Subscribe(ctx context.Context) <-chan V

Subscribe returns a channel that receives changed signal values.

func (Derived) Watch added in v0.0.52

func (b Derived) Watch(ctx context.Context) <-chan struct{}

Watch returns a channel that receives a notification when the signal changes.

type Effect added in v0.0.52

type Effect[V any] struct {
	*BaseActor
	// contains filtered or unexported fields
}

Effect is an actor that reacts to values from a ReadableSignal.

func NewEffect added in v0.0.52

func NewEffect[V any](
	id string,
	signal ReadableSignal[V],
	fn EffectFunc[V],
) *Effect[V]

NewEffect creates an effect that reacts to values from signal.

func (*Effect[V]) Inspect added in v0.0.52

func (e *Effect[V]) Inspect() Spec

Inspect returns the effect spec.

func (*Effect[V]) Run added in v0.0.52

func (e *Effect[V]) Run(ctx context.Context) error

Run subscribes to the signal and calls the effect function for each value.

type EffectFunc added in v0.0.52

type EffectFunc[V any] func(context.Context, V) error

EffectFunc handles a value emitted by a signal.

type EmitFunc added in v0.0.52

type EmitFunc[V any] func(context.Context, V) error

EmitFunc sends a value from a source into a signal.

type Event added in v0.0.52

type Event struct {
	ID        string    `json:"id"`
	Timestamp int64     `json:"timestamp"`
	Type      EventType `json:"type"`
	SourceID  string    `json:"source_id"`
	Payload   any       `json:"payload,omitempty"`
}

Event describes an occurrence emitted by an actor, supervisor, or signal.

func NewEvent added in v0.0.52

func NewEvent(eventType EventType, sourceID string, payload any) Event

NewEvent creates an event with a generated id and current timestamp.

type EventType added in v0.0.52

type EventType string

EventType identifies the kind of event that occurred.

const (
	// EventActorRegistered is emitted when an actor is registered with a supervisor.
	EventActorRegistered EventType = "actor:registered"

	// EventActorStarted is emitted when an actor starts running.
	EventActorStarted EventType = "actor:started"

	// EventActorStopped is emitted when an actor stops running.
	EventActorStopped EventType = "actor:stopped"

	// EventActorRestarting is emitted before a supervisor restarts an actor.
	EventActorRestarting EventType = "actor:restarting"

	// EventSupervisorTerminal is emitted when a supervisor reaches a terminal error.
	EventSupervisorTerminal EventType = "supervisor:terminal"

	// EventSignalUpdated is emitted when a signal value changes.
	EventSignalUpdated EventType = "signal:updated"
)

type Processor added in v0.0.52

type Processor[V any] interface {
	Process(ctx context.Context, value V, runtime ProcessorRuntime[V]) error
	OnTimer(ctx context.Context, timerID uint64, runtime ProcessorRuntime[V]) error
}

Processor transforms, filters, or delays values written to a signal.

func Debounce added in v0.0.52

func Debounce[V any](wait time.Duration) Processor[V]

Debounce returns a processor that emits the latest value after wait has elapsed.

func Filter added in v0.0.52

func Filter[V any](fn func(V) bool) Processor[V]

Filter returns a processor that emits only values accepted by fn.

func Map added in v0.0.52

func Map[V any](fn func(V) V) Processor[V]

Map returns a processor that transforms each value with fn.

func Throttle added in v0.0.52

func Throttle[V any](interval time.Duration) Processor[V]

Throttle returns a processor that emits at most one value per interval.

type ProcessorFunc added in v0.0.52

type ProcessorFunc[V any] func(context.Context, V, ProcessorRuntime[V]) error

ProcessorFunc adapts a function into a Processor.

func (ProcessorFunc[V]) OnTimer added in v0.0.52

func (f ProcessorFunc[V]) OnTimer(
	ctx context.Context,
	timerID uint64,
	runtime ProcessorRuntime[V],
) error

OnTimer ignores timer callbacks for function processors.

func (ProcessorFunc[V]) Process added in v0.0.52

func (f ProcessorFunc[V]) Process(
	ctx context.Context,
	value V,
	runtime ProcessorRuntime[V],
) error

Process calls f with the incoming value and runtime.

type ProcessorRuntime added in v0.0.52

type ProcessorRuntime[V any] interface {
	Emit(V)
	After(delay time.Duration, timerID uint64)
}

ProcessorRuntime lets processors emit values or schedule timers.

type ReadableSignal added in v0.0.40

type ReadableSignal[V any] interface {
	ReaderSignal[V]
	SubscriberSignal[V]
	WatcherSignal
}

ReadableSignal is a signal that can be read, watched, and subscribed to.

type ReaderSignal added in v0.0.40

type ReaderSignal[V any] interface {
	Actor

	// Read returns the current signal value.
	Read() V
}

ReaderSignal is a signal that exposes its current value.

type RepliableRequest added in v0.0.9

type RepliableRequest[R any] interface {
	Reply(value R, err error)
}

RepliableRequest represents a request that can be replied to.

type Resettable added in v0.0.52

type Resettable interface {
	Reset()
}

Resettable represents a processor that can reset its internal state.

type RestartPolicy

type RestartPolicy uint8

RestartPolicy controls when a supervisor restarts a stopped actor.

const (
	// Permanent always restarts actors, even after clean exits.
	Permanent RestartPolicy = iota

	// Transient restarts actors after errors or panics, but not after clean exits.
	Transient

	// Temporary never restarts actors.
	Temporary
)

type Signal added in v0.0.43

type Signal[V any] struct {
	// contains filtered or unexported fields
}

Signal stores a value and publishes updates to subscribers and watchers.

func NewSignal added in v0.0.52

func NewSignal[V any](
	id string,
	initial V,
) *Signal[V]

NewSignal creates a signal with the given id and initial value.

func (*Signal[V]) Buffer added in v0.0.52

func (s *Signal[V]) Buffer(size int) *Signal[V]

Buffer sets the subscriber channel buffer size.

func (*Signal[V]) Debounce added in v0.0.52

func (s *Signal[V]) Debounce(wait time.Duration) *Signal[V]

Debounce adds a processor that emits the latest value after wait has elapsed.

func (*Signal[V]) Effect added in v0.0.52

func (s *Signal[V]) Effect(
	id string,
	fn EffectFunc[V],
) *Effect[V]

Effect creates an effect that reacts to this signal.

func (*Signal[V]) Equal added in v0.0.52

func (s *Signal[V]) Equal(eq func(a, b V) bool) *Signal[V]

Equal sets the equality function used to suppress unchanged values.

func (*Signal[V]) Filter added in v0.0.52

func (s *Signal[V]) Filter(fn func(V) bool) *Signal[V]

Filter adds a processor that emits only values accepted by fn.

func (*Signal[V]) InitialNotify added in v0.0.52

func (s *Signal[V]) InitialNotify() *Signal[V]

InitialNotify makes new subscribers receive the current value immediately.

func (*Signal[V]) Inspect added in v0.0.52

func (s *Signal[V]) Inspect() Spec

Inspect returns the signal spec.

func (*Signal[V]) Map added in v0.0.52

func (s *Signal[V]) Map(fn func(V) V) *Signal[V]

Map adds a processor that transforms each value with fn.

func (*Signal[V]) Poll added in v0.0.52

func (s *Signal[V]) Poll(
	interval time.Duration,
	fn func(context.Context) (V, error),
) *Signal[V]

Poll adds a polling source to the signal.

func (*Signal[V]) Processor added in v0.0.52

func (s *Signal[V]) Processor(processor Processor[V]) *Signal[V]

Processor adds a processor to the signal pipeline.

func (Signal) Read added in v0.0.52

func (b Signal) Read() V

Read returns the current signal value.

func (*Signal[V]) Run added in v0.0.52

func (s *Signal[V]) Run(ctx context.Context) error

Run starts the signal sources and blocks until they stop, fail, or the context is canceled.

func (*Signal[V]) Source added in v0.0.52

func (s *Signal[V]) Source(source Source[V]) *Signal[V]

Source adds a source that writes values to the signal when it runs.

func (Signal) Subscribe added in v0.0.52

func (b Signal) Subscribe(ctx context.Context) <-chan V

Subscribe returns a channel that receives changed signal values.

func (*Signal[V]) Throttle added in v0.0.52

func (s *Signal[V]) Throttle(interval time.Duration) *Signal[V]

Throttle adds a processor that emits at most one value per interval.

func (Signal) Watch added in v0.0.52

func (b Signal) Watch(ctx context.Context) <-chan struct{}

Watch returns a channel that receives a notification when the signal changes.

func (*Signal[V]) Write added in v0.0.52

func (s *Signal[V]) Write(ctx context.Context, value V) error

Write updates the signal value after applying configured processors.

type SignalUpdatedPayload added in v0.0.52

type SignalUpdatedPayload struct {
	Value any `json:"value"`
}

SignalUpdatedPayload is the payload for EventSignalUpdated.

type Source added in v0.0.52

type Source[V any] interface {
	Run(ctx context.Context, emit EmitFunc[V]) error
}

Source produces values for a signal.

func FromChannel added in v0.0.52

func FromChannel[V any](ch <-chan V) Source[V]

FromChannel returns a source that emits values received from ch.

func Poll added in v0.0.52

func Poll[V any](
	interval time.Duration,
	fn func(context.Context) (V, error),
) Source[V]

Poll returns a source that calls fn on each interval and emits the result.

type SourceFunc added in v0.0.52

type SourceFunc[V any] func(context.Context, EmitFunc[V]) error

SourceFunc adapts a function into a Source.

func (SourceFunc[V]) Run added in v0.0.52

func (f SourceFunc[V]) Run(ctx context.Context, emit EmitFunc[V]) error

Run calls f with the context and emit function.

type Spec added in v0.0.43

type Spec struct {
	Kind         string            `json:"kind"`
	Dependencies []string          `json:"dependencies"`
	Metadata     map[string]string `json:"metadata"`
}

Spec represents a structured description of an inspectable component. It captures the component's kind, its dependency names, and arbitrary configuration metadata for visualization and debugging purposes.

type SubscriberSignal added in v0.0.40

type SubscriberSignal[V any] interface {
	Actor

	// Subscribe returns a channel that receives changed signal values.
	Subscribe(context.Context) <-chan V
}

SubscriberSignal is a signal that publishes changed values to subscribers.

type Supervisor

type Supervisor struct {
	*BaseActor
	// contains filtered or unexported fields
}

Supervisor runs actors and restarts them according to its restart policy.

func NewSupervisor added in v0.0.7

func NewSupervisor(id string) *Supervisor

NewSupervisor creates a supervisor with the given id.

func (*Supervisor) Actor added in v0.0.52

func (s *Supervisor) Actor(actor Actor) *Supervisor

Actor adds an actor to be started when the supervisor runs.

func (*Supervisor) Actors added in v0.0.52

func (s *Supervisor) Actors(actors ...Actor) *Supervisor

Actors adds multiple actors to be started when the supervisor runs.

func (*Supervisor) Children added in v0.0.43

func (s *Supervisor) Children() []Actor

Children returns a copy of the configured child actors.

func (*Supervisor) Inspect added in v0.0.43

func (s *Supervisor) Inspect() Spec

Inspect returns the supervisor spec.

func (*Supervisor) Observer added in v0.0.52

func (s *Supervisor) Observer(observer *SupervisorObserver) *Supervisor

Observer adds a lifecycle observer to the supervisor.

func (*Supervisor) OnError

func (s *Supervisor) OnError(handler func(actor Actor, err error)) *Supervisor

OnError sets a handler called when an actor exits with an error or panic.

func (*Supervisor) Policy

func (s *Supervisor) Policy(policy RestartPolicy) *Supervisor

Policy sets the supervisor restart policy.

func (*Supervisor) RestartDelay

func (s *Supervisor) RestartDelay(d time.Duration) *Supervisor

RestartDelay sets the delay between actor restart attempts.

func (*Supervisor) RestartLimit added in v0.0.52

func (s *Supervisor) RestartLimit(maxRestarts int, window time.Duration) *Supervisor

RestartLimit sets the maximum restarts allowed within a time window.

func (*Supervisor) Run added in v0.0.14

func (s *Supervisor) Run(ctx context.Context) error

Run starts configured actors and blocks until they stop, fail, or the context is canceled.

func (*Supervisor) Spawn added in v0.0.14

func (s *Supervisor) Spawn(ctx context.Context, actor Actor)

Spawn starts and supervises an actor immediately.

func (*Supervisor) Wait

func (s *Supervisor) Wait()

Wait blocks until all spawned actors have stopped.

type SupervisorObserver added in v0.0.28

type SupervisorObserver struct {
	OnActorRegistered    func(supervisor *Supervisor, actor Actor)
	OnActorStarted       func(supervisor *Supervisor, actor Actor)
	OnActorStopped       func(supervisor *Supervisor, actor Actor, err error)
	OnActorRestarting    func(supervisor *Supervisor, actor Actor, restartCount int, lastErr error)
	OnSupervisorTerminal func(supervisor *Supervisor, err error)
}

SupervisorObserver receives lifecycle callbacks from a supervisor.

type SupervisorTerminalPayload added in v0.0.52

type SupervisorTerminalPayload struct {
	Error string `json:"error,omitempty"`
}

SupervisorTerminalPayload is the payload for EventSupervisorTerminal.

type WatcherSignal added in v0.0.40

type WatcherSignal interface {
	Actor

	// Watch returns a channel that receives a notification when the signal changes.
	Watch(ctx context.Context) <-chan struct{}
}

WatcherSignal is a signal that notifies watchers when its value changes.

type WritableSignal added in v0.0.40

type WritableSignal[V any] interface {
	ReadableSignal[V]
	WriterSignal[V]
}

WritableSignal is a readable signal that can also be written to.

type WriterSignal added in v0.0.40

type WriterSignal[V any] interface {
	Actor

	// Write updates the signal value.
	Write(context.Context, V) error
}

WriterSignal is a signal that accepts new values.

Directories

Path Synopsis
examples
dashboard command
simple command
Package hub exposes sup actors, controls, signals, and events over HTTP.
Package hub exposes sup actors, controls, signals, and events over HTTP.
mesh module
modbus module
ws module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL