sup

package module
v0.0.43 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 14, 2026 License: MIT Imports: 10 Imported by: 0

README

sup

Go Reference Test License

sup is a high-performance, low-allocation Actor Model library for Go.

It provides a robust foundation for building highly concurrent, distributed, and fault-tolerant stateful applications. It achieves very low allocations for asynchronous messages (Cast) and minimizes overhead for synchronous requests (Call) by utilizing typed inboxes and internal pooling. It embraces standard Go idioms (select, channels, and context) rather than hiding them behind heavy frameworks.

Features

  • Idiomatic Go — Actors are just goroutines running a Run loop. No magic interfaces, no reflection, no global registries.
  • OTP-style supervision — Supervisor trees with Permanent, Transient, and Temporary restart policies.
  • Panic recovery — Panics are caught, wrapped with a stack trace, and reported via WithOnError. The actor is then restarted according to the policy.
  • Restart limits — Cap restarts within a sliding time window with WithRestartLimit.
  • Context-driven lifecyclecontext.Context ensures actors shut down cleanly when the parent context is canceled.
  • Typed inboxesCastInbox[T] and CallInbox[T, R] provide type-safe, efficient messaging.
  • Supervisor observers — Lightweight lifecycle hooks for metrics, logging, or diagnostics via SupervisorObserver.
  • Supervisor trees — Supervisors implement the Actor interface, so they compose naturally.

Installation

go get github.com/webermarci/sup

Quick Start

This example demonstrates a simple Counter actor using a CastInbox for fire-and-forget increments and a CallInbox for request/reply reads.

package main

import (
	"context"
	"fmt"
	"log/slog"
	"os"
	"time"

	"github.com/webermarci/sup"
)

// 1. Define internal messages
type incrementMsg struct{ amount int }
type getCountMsg struct{}

// 2. Define the actor with typed inboxes
type Counter struct {
	*sup.BaseActor
	Casts *sup.CastInbox[incrementMsg]
	Calls *sup.CallInbox[getCountMsg, int]
	count int
}

func NewCounter() *Counter {
	return &Counter{
		BaseActor: sup.NewBaseActor("counter"),
		Casts:     sup.NewCastInbox[incrementMsg](16),
		Calls:     sup.NewCallInbox[getCountMsg, int](8),
	}
}

// 3. Public API — callers never access the inbox directly
func (c *Counter) Increment(amount int) {
	_ = c.Casts.Cast(context.Background(), incrementMsg{amount: amount})
}

func (c *Counter) Get() (int, error) {
	return c.Calls.Call(context.Background(), getCountMsg{})
}

// 4. Actor run loop
func (c *Counter) Run(ctx context.Context) error {
	for {
		select {
		case <-ctx.Done():
			return ctx.Err()
			
		case inc, ok := <-c.Casts.Receive():
			if !ok {
				return nil
			}
			c.count += inc.amount
			
		case req, ok := <-c.Calls.Receive():
			if !ok {
				return nil
			}
			req.Reply(c.count, nil)
		}
	}
}

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	counter := NewCounter()

	// Optional: create a logger for the supervisor (propagated to child actors)
	logger := slog.New(slog.NewTextHandler(os.Stdout, nil))

	supervisor := sup.NewSupervisor("root",
		sup.WithActor(counter),
		sup.WithPolicy(sup.Permanent),
		sup.WithRestartDelay(time.Second),
		sup.WithRestartLimit(5, 10*time.Second),
		sup.WithOnError(func(actor sup.Actor, err error) {
			fmt.Printf("Actor %s failed with error: %v\n", actor.Name(), err)
		}),
		sup.WithLogger(logger),
	)

	go supervisor.Run(ctx)

	counter.Increment(10)
	counter.Increment(32)

	count, err := counter.Get(ctx)
	if err != nil {
		panic(err)
	}

	fmt.Printf("Final count: %d\n", count)

	cancel()
	supervisor.Wait()
}

Restart Policies

Policy Clean exit (nil) Error or panic
Permanent Restarts Restarts
Transient Stops Restarts
Temporary Stops Stops

Inboxes (CastInbox / CallInbox)

CastInbox[T] and CallInbox[T, R] are the type-safe building blocks for actor communication:

  • NewCastInbox[T](size int) — create a cast inbox for messages of type T.
  • NewCallInbox[T, R](size int) — create a call inbox that sends T and expects R.

CastInbox[T] API:

  • Cast(ctx context.Context, message T) error — enqueue a message (blocks until space or context expires).
  • TryCast(ctx context.Context, message T) error — non-blocking attempt; returns ErrCastInboxFull if full (or ctx.Err() if ctx is done).
  • Receive() <-chan T — read-only channel for the actor's run loop.
  • Close(), Len(), Cap().

CallInbox[T, R] API:

  • Call(ctx context.Context, message T) (R, error) — send a request and wait for reply (or context cancellation).
  • Receive() <-chan CallRequest[T, R] — incoming requests inside the actor; use req.Reply(value, err) to respond.
  • Close(), Len(), Cap().
Sending variants (summary)
Method Behaviour on full inbox Behaviour on closed inbox
(*CastInbox).Cast Blocks until space or ctx done Returns ErrCastInboxClosed
(*CastInbox).TryCast Returns ErrCastInboxFull immediately (or ctx.Err() if ctx done) Returns ErrCastInboxClosed
(*CallInbox).Call Blocks until reply or ctx done Returns ErrCallInboxClosed

Signals

Signals are actor-backed, reactive values in the main package. They provide a simple API to read the current value, subscribe to updates, and watch for change notifications. Signals implement the ReadableSignal / WritableSignal interfaces and are also sup.Actor instances — run them under a supervisor or as a goroutine.

Common methods:

  • Read() V — immediate snapshot of the current value
  • Subscribe(ctx) <-chan V — receive value updates
  • Watch(ctx) <-chan struct{} — receive change notifications (no value)
  • Write(ctx, v) error — update writable signals (e.g. PushedSignal)

Built-in signal types:

  • PeriodicSignalNewPeriodicSignal(name, update func(context.Context) (V, error), interval)

    • Periodically calls update and broadcasts changes to subscribers.
  • ComputedSignalNewComputedSignal(name, update func() V, deps ...WatcherSignal)

    • Recomputes when dependencies notify; supports coalescing via SetCoalesceWindow.
  • DebouncedSignalNewDebouncedSignal(name, src ReadableSignal[V], wait time.Duration)

    • Debounces bursts of updates from a source; optionally configure SetMaxWait.
  • ThrottledSignalNewThrottledSignal(name, src ReadableSignal[V], interval time.Duration)

    • Emits at most once per interval (trailing-edge), always sending the most recent value.
  • PushedSignalNewPushedSignal(name, update func(context.Context, V) error)

    • External writers call Write(ctx, v); update can validate or persist before broadcasting.

Signals are small actors and should be supervised. Example:

// update once per second and broadcast the value to subscribers
timeSig := sup.NewPeriodicSignal("time", func(ctx context.Context) (time.Time, error) {
  return time.Now(), nil
}, time.Second)

root := sup.NewSupervisor("signals",
  sup.WithActor(timeSig),
  sup.WithPolicy(sup.Permanent),
)

go root.Run(ctx)

Supervisor Trees

Supervisors implement the Actor interface, so they can be nested inside one another. Child supervisors/actors inherit the supervisor's logger when spawned.

dbActor := NewDatabaseActor()
cacheActor := NewCacheActor()

// Child supervisor manages data-layer actors
dataSup := sup.NewSupervisor("data_supervisor",
	sup.WithActors(dbActor, cacheActor),
	sup.WithPolicy(sup.Permanent),
	sup.WithRestartDelay(500*time.Millisecond),
)

// Root supervisor treats the child supervisor as an actor
root := sup.NewSupervisor("root",
	sup.WithActor(dataSup),
	sup.WithPolicy(sup.Permanent),
)

go root.Run(ctx)

Stateless Actors

For actors that don't need a mailbox or internal state, use ActorFunc. Note the function receives both the context.Context and a *slog.Logger so you can log directly from the actor.

healthCheck := sup.ActorFunc("health", func(ctx context.Context, logger *slog.Logger) error {
	ticker := time.NewTicker(30 * time.Second)
	defer ticker.Stop()
	for {
		select {
		case <-ctx.Done():
			return ctx.Err()
		case <-ticker.C:
			if err := ping(); err != nil {
				logger.Error("health check failed", "err", err)
				return err // supervisor will restart based on policy
			}
		}
	}
})

sup.NewSupervisor("health_supervisor",
	sup.WithActor(healthCheck),
	sup.WithPolicy(sup.Transient),
).Run(ctx)

Dynamic Spawning

Use Spawn to start actors dynamically after the supervisor is already running:

supervisor := sup.NewSupervisor("job_supervisor",
	sup.WithPolicy(sup.Temporary),
)
go supervisor.Run(ctx)

// Later, spawn actors on demand
for _, job := range jobs {
	supervisor.Spawn(ctx, newJobActor(job))
}

supervisor.Wait()

Observability

sup exposes a minimal observer mechanism via SupervisorObserver and the WithObserver option. Observers receive small, optional callbacks for lifecycle events. Callbacks are invoked asynchronously and panics are recovered — observers cannot block or crash the supervisor.

  • OnActorRegistered(actor Actor)
  • OnActorStarted(actor Actor)
  • OnActorStopped(actor Actor, err error)
  • OnActorRestarting(actor Actor, restartCount int, lastErr error)
  • OnSupervisorTerminal(err error)
observer := &sup.SupervisorObserver{
	OnActorRegistered: func(a sup.Actor) {
		fmt.Printf("registered: %s\n", a.Name())
	},
	OnActorStarted: func(a sup.Actor) {
		fmt.Printf("started: %s\n", a.Name())
	},
	OnActorStopped: func(a sup.Actor, err error) {
		fmt.Printf("stopped: %s err=%v\n", a.Name(), err)
	},
	OnActorRestarting: func(a sup.Actor, count int, lastErr error) {
		fmt.Printf("restarting: %s count=%d lastErr=%v\n", a.Name(), count, lastErr)
	},
	OnSupervisorTerminal: func(err error) {
		fmt.Printf("supervisor terminal: err=%v\n", err)
	},
}

supervisor := sup.NewSupervisor("root",
	sup.WithObserver(observer),
)

Router

Router[F any] is a small, generic utility for distributing work across a fixed set of routees. It provides low-overhead selection strategies and several helpers for broadcasting or fan-out execution.

Construction

  • NewRouter[F any](strategy RouterStrategy, routees ...F) *Router[F] — create a router with one of the built-in strategies.
  • Strategies: RoundRobin, Random.

API

  • (*Router[F]).Len() int — number of routees.
  • (*Router[F]).Next() F — pick the next routee according to the router strategy.
  • (*Router[F]).Sticky(key uint64) F — pick a routee deterministically from a key (useful for consistent hashing-like behaviour).
  • (*Router[F]).Broadcast(fn func(F)) — call fn synchronously for every routee.
  • (*Router[F]).FanOut(fn func(F)) — call fn in a separate goroutine for each routee.
  • (*Router[F]).FanOutWait(fn func(F)) — fan out and wait for all invocations to finish.
  • (*Router[F]).Retry(limit int, run func(F) error) error — try run on up to limit routees until one succeeds; returns the last error if all fail.
Example
workers := []*Worker{w1, w2, w3}
router := sup.NewRouter(sup.RoundRobin, workers...)

// get the next worker (round-robin)
w := router.Next().Process(task)

// broadcast to all workers
router.Broadcast(func(w *Worker) {
	w.Process(task)
})

// fan-out and wait for all workers to finish
router.FanOutWait(func(w *Worker) {
	w.Process(task)
})

// retry with up to 3 different workers
err := router.Retry(3, func(w *Worker) error {
  return w.Process(task)
})

Packages

  • sup — Core supervisor and typed inbox implementations
  • sup/exec — Actor wrapper around os/exec for managing external processes as actors
  • sup/mesh — NATS-backed actors for pub/sub messaging with automatic connection management
  • sup/modbus — Actor wrapper around Modbus connections (TCP/RTU/ASCII) for thread-safe hardware access with automatic reconnection
  • sup/mqtt — Actor wrapper around MQTT clients (Paho) for publish/subscribe with automatic reconnects and subscription handling
  • sup/sse — Actor wrapper around Server-Sent Events (SSE) for consuming real-time event streams with automatic reconnection and last-event-id tracking
  • sup/ui — Real-time dashboard for visualizing and inspecting actors in your supervisor tree
  • sup/ws — Actor wrapper around WebSocket connections for thread-safe communication with automatic reconnection

Benchmark

goos: darwin
goarch: arm64
pkg: github.com/webermarci/sup
cpu: Apple M5
BenchmarkBroadcaster_Notify/1-10        123507973       9.6 ns/op     0 B/op    0 allocs/op
BenchmarkBroadcaster_Notify/10-10        11350057     108.0 ns/op     0 B/op    0 allocs/op
BenchmarkBroadcaster_Notify/100-10         105594   11447.0 ns/op     0 B/op    0 allocs/op
BenchmarkCallInbox_SingleWorker-10        3318741     346.1 ns/op     0 B/op    0 allocs/op
BenchmarkCallInbox_Contention-10          1000000     682.5 ns/op     0 B/op    0 allocs/op
BenchmarkCastInbox_SingleWorker-10       37592265      31.9 ns/op     0 B/op    0 allocs/op
BenchmarkCastInbox_Parallel-10           24781441      48.6 ns/op     0 B/op    0 allocs/op
BenchmarkCastInbox_TryCast-10           134381470       8.9 ns/op     0 B/op    0 allocs/op
BenchmarkPeriodicSignal_Update/1µs-10      456361    3542.0 ns/op     0 B/op    0 allocs/op
BenchmarkPeriodicSignal_Update/10µs-10     119919   10004.0 ns/op     0 B/op    0 allocs/op
BenchmarkPeriodicSignal_Update/100µs-10     12000   99993.0 ns/op     0 B/op    0 allocs/op
BenchmarkPushedSignal_Write-10          133784850       8.9 ns/op     0 B/op    0 allocs/op
BenchmarkOutbox_Emit/1-10               187332144       6.4 ns/op     0 B/op    0 allocs/op
BenchmarkOutbox_Emit/10-10               41823868      28.3 ns/op     0 B/op    0 allocs/op
BenchmarkOutbox_Emit/100-10               4646634     256.1 ns/op     0 B/op    0 allocs/op
BenchmarkOutbox_Subscribe-10            100000000      24.0 ns/op    49 B/op    0 allocs/op
BenchmarkOutbox_EmitFireAndForget-10    337680223       3.6 ns/op     0 B/op    0 allocs/op
BenchmarkRouter_Next_RoundRobin-10      714396448       1.7 ns/op     0 B/op    0 allocs/op
BenchmarkRouter_Next_Random-10          236811043       5.1 ns/op     0 B/op    0 allocs/op
BenchmarkRouter_Next_Parallel-10         30816442      39.7 ns/op     0 B/op    0 allocs/op
BenchmarkSupervisor_SpawnAndExit-10       1810195     661.4 ns/op   474 B/op   12 allocs/op
BenchmarkSupervisor_RestartCycle-10       1218945     980.3 ns/op   224 B/op    6 allocs/op
BenchmarkSupervisor_ParallelSpawn-10      1644014     757.4 ns/op   616 B/op   11 allocs/op

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrCallInboxFull   = errors.New("sup: call inbox is full")
	ErrCallInboxClosed = errors.New("sup: call inbox is closed")
)
View Source
var (
	ErrCastInboxFull   = errors.New("sup: cast inbox is full")
	ErrCastInboxClosed = errors.New("sup: cast inbox is closed")
)

Functions

This section is empty.

Types

type Actor added in v0.0.13

type Actor interface {
	Name() string
	Run(context.Context) error
	// contains filtered or unexported methods
}

Actor represents a concurrent entity that can be supervised. It has a name and a Run method that executes its logic. The Run method should return an error if the actor needs to be restarted, or nil if it can exit cleanly. Panics will also trigger a restart. The setLogger method is used internally by the supervisor to inject a logger into the actor.

func ActorFunc added in v0.0.13

func ActorFunc(name string, fn func(ctx context.Context, logger *slog.Logger) error) Actor

ActorFunc creates a simple stateless actor from a function.

type BaseActor added in v0.0.20

type BaseActor struct {
	// contains filtered or unexported fields
}

BaseActor provides a simple implementation of the Actor interface with a name and a logger. It can be embedded in other structs to create more complex actors. The Name and Logger methods are safe to call from inside Run(), and the setLogger method is used internally by the supervisor to inject a logger into the actor.

func NewBaseActor added in v0.0.20

func NewBaseActor(name string) *BaseActor

NewBaseActor creates a new BaseActor with the given name. The logger is initialized to a no-op logger and will be set by the supervisor when the actor is spawned.

func (*BaseActor) Inspect added in v0.0.43

func (a *BaseActor) Inspect() Spec

Inspect returns the specification.

func (*BaseActor) Logger added in v0.0.33

func (a *BaseActor) Logger() *slog.Logger

Logger returns the actor's logger. It is safe to call from inside Run().

func (*BaseActor) Name added in v0.0.20

func (a *BaseActor) Name() string

Name returns the actor's name. It is safe to call from inside Run().

type BaseSignal added in v0.0.40

type BaseSignal[V any] struct {
	*BaseActor
	// contains filtered or unexported fields
}

BaseSignal centralizes broadcasting/subscription behavior for signals. It intentionally does NOT manage the value or its locking; concrete signals keep their own value + mutex and call the base's notify/subscribe helpers.

func NewBaseSignal added in v0.0.40

func NewBaseSignal[V any](name string) *BaseSignal[V]

NewBaseSignal creates a BaseSignal with sane defaults.

func (*BaseSignal[V]) Read added in v0.0.40

func (b *BaseSignal[V]) Read() V

Read returns the current stored value (thread-safe).

func (*BaseSignal[V]) SetBroadcasterBuffer added in v0.0.40

func (b *BaseSignal[V]) SetBroadcasterBuffer(buffer int)

SetBroadcasterBuffer configures the internal broadcaster buffer size. It acquires a lock to ensure thread-safe access to the broadcaster's buffer configuration.

func (*BaseSignal[V]) SetEqual added in v0.0.40

func (b *BaseSignal[V]) SetEqual(eq func(a, b V) bool)

SetEqual configures the equality function used to determine if a new value is different from the current value before notifying subscribers. This can help prevent unnecessary notifications when the value hasn't actually changed. It acquires a lock to ensure thread-safe access to the equal function.

func (*BaseSignal[V]) SetInitialNotify added in v0.0.40

func (b *BaseSignal[V]) SetInitialNotify(enabled bool)

SetInitialNotify configures whether to notify subscribers immediately with the current value upon subscription. It acquires a lock to ensure thread-safe access to the initialNotify flag.

func (*BaseSignal[V]) SetInitialValue added in v0.0.40

func (b *BaseSignal[V]) SetInitialValue(v V)

SetInitialValue sets the initial value of the Signal before any updates occur. It acquires a lock to ensure thread-safe access to the value.

func (*BaseSignal[V]) Subscribe added in v0.0.40

func (b *BaseSignal[V]) Subscribe(ctx context.Context) <-chan V

Subscribe returns a channel that receives updates whenever the Signal's value changes. If initialNotify is enabled, the current value is sent to the channel immediately upon subscription. It acquires a read lock to ensure thread-safe access to the current value when subscribing.

func (*BaseSignal[V]) Watch added in v0.0.40

func (b *BaseSignal[V]) Watch(ctx context.Context) <-chan struct{}

Watch allows clients to subscribe to notifications whenever the Signal's value is updated, without receiving the value itself. If initialNotify is enabled, a notification is sent to the channel immediately upon subscription.

type CallInbox added in v0.0.37

type CallInbox[T any, R any] struct {
	// contains filtered or unexported fields
}

CallInbox manages request-response communication.

func NewCallInbox added in v0.0.37

func NewCallInbox[T any, R any](size int) *CallInbox[T, R]

NewCastInbox creates a new inbox for a specific message type.

func (*CallInbox[T, R]) Call added in v0.0.37

func (i *CallInbox[T, R]) Call(ctx context.Context, message T) (R, error)

Call sends a request and waits for the response.

func (*CallInbox[T, R]) Cap added in v0.0.37

func (i *CallInbox[T, R]) Cap() int

Cap returns the capacity of the inbox buffer.

func (*CallInbox[T, R]) Close added in v0.0.37

func (i *CallInbox[T, R]) Close()

Close safely shuts down the inbox.

func (*CallInbox[T, R]) Len added in v0.0.37

func (i *CallInbox[T, R]) Len() int

Len returns the number of messages currently in the inbox.

func (*CallInbox[T, R]) Receive added in v0.0.37

func (i *CallInbox[T, R]) Receive() <-chan CallRequest[T, R]

Receive returns the read-only channel for the actor's internal loop.

type CallRequest added in v0.0.8

type CallRequest[T any, R any] struct {
	// contains filtered or unexported fields
}

CallRequest wraps a payload with a reply channel for synchronous calls.

func (CallRequest[T, R]) Payload added in v0.0.8

func (r CallRequest[T, R]) Payload() T

Payload returns the request's payload.

func (CallRequest[T, R]) Reply added in v0.0.8

func (r CallRequest[T, R]) Reply(value R, err error)

Reply sends the response back to the caller. The actor should call this exactly once per request.

type CastInbox added in v0.0.37

type CastInbox[T any] struct {
	// contains filtered or unexported fields
}

CastInbox is a type-safe, write-only entry point for fire-and-forget messages.

func NewCastInbox added in v0.0.37

func NewCastInbox[T any](size int) *CastInbox[T]

NewCastInbox creates a new inbox for a specific message type.

func (*CastInbox[T]) Cap added in v0.0.37

func (i *CastInbox[T]) Cap() int

Cap returns the capacity of the inbox buffer.

func (*CastInbox[T]) Cast added in v0.0.37

func (i *CastInbox[T]) Cast(ctx context.Context, message T) error

Cast pushes a message into the inbox with context for cancellation. It blocks if the inbox is full, but returns ctx.Err() if the context expires before the message is enqueued.

func (*CastInbox[T]) Close added in v0.0.37

func (i *CastInbox[T]) Close()

Close safely shuts down the inbox.

func (*CastInbox[T]) Len added in v0.0.37

func (i *CastInbox[T]) Len() int

Len returns the number of messages currently in the inbox.

func (*CastInbox[T]) Receive added in v0.0.37

func (i *CastInbox[T]) Receive() <-chan T

Receive returns the read-only channel for the actor's internal loop.

func (*CastInbox[T]) TryCast added in v0.0.37

func (i *CastInbox[T]) TryCast(ctx context.Context, message T) error

TryCastContext attempts to push a message without blocking, but returns ctx.Err() if ctx is done.

type CastRequest added in v0.0.8

type CastRequest[T any] struct {
	// contains filtered or unexported fields
}

CastRequest wraps a payload for asynchronous calls without expecting a reply.

func (CastRequest[T]) Payload added in v0.0.8

func (r CastRequest[T]) Payload() T

Payload returns the request's payload.

type ComputedSignal added in v0.0.40

type ComputedSignal[V any] struct {
	*BaseSignal[V]
	// contains filtered or unexported fields
}

ComputedSignal is a reactive value that updates itself based on its dependencies. It implements both Subscribable and Notifyable interfaces.

func NewComputedSignal added in v0.0.40

func NewComputedSignal[V any](name string, update func() V, deps ...WatcherSignal) *ComputedSignal[V]

NewComputedSignal creates a new ComputedSignal with the given name, update function, and dependencies. The update function is called whenever any of the dependencies notify a change, and the result is broadcast to subscribers.

func (*ComputedSignal[V]) Inspect added in v0.0.43

func (s *ComputedSignal[V]) Inspect() Spec

Inspect returns the specification.

func (*ComputedSignal[V]) Run added in v0.0.40

func (s *ComputedSignal[V]) Run(ctx context.Context) error

Run is the main loop for the Computed actor. It subscribes to all dependencies and listens for notifications. Whenever any dependency notifies a change, it calls the update function to compute the new value, updates its internal state, and broadcasts the new value to subscribers. The loop continues until the context is canceled, at which point it cleans up and exits.

func (*ComputedSignal[V]) SetCoalesceWindow added in v0.0.40

func (s *ComputedSignal[V]) SetCoalesceWindow(window time.Duration)

SetCoalesceWindow configures the duration to wait after receiving a notification from any dependency before triggering an update. This allows for coalescing multiple rapid updates into a single update, improving efficiency. It acquires a lock to ensure thread-safe access to the coalesceWindow configuration.

type DebouncedSignal added in v0.0.40

type DebouncedSignal[V any] struct {
	*BaseSignal[V]
	// contains filtered or unexported fields
}

DebouncedSignal is a reactive value that delays broadcasting updates from its source until the source has stopped changing for a specified wait duration.

func NewDebouncedSignal added in v0.0.40

func NewDebouncedSignal[V any](name string, src ReadableSignal[V], wait time.Duration) *DebouncedSignal[V]

NewDebouncedSignal creates a new DebouncedSignal actor attached to a source provider.

func (*DebouncedSignal[V]) Inspect added in v0.0.43

func (s *DebouncedSignal[V]) Inspect() Spec

Inspect returns the specification.

func (*DebouncedSignal[V]) Run added in v0.0.40

func (s *DebouncedSignal[V]) Run(ctx context.Context) error

Run is the main actor loop. It subscribes to the source and manages the sliding window.

func (*DebouncedSignal[V]) SetMaxWait added in v0.0.40

func (s *DebouncedSignal[V]) SetMaxWait(maxWait time.Duration)

SetWait configures the duration to wait after receiving an update from the source before broadcasting the new value. This allows for coalescing multiple rapid updates into a single update, improving efficiency. It acquires a lock to ensure thread-safe access to the wait configuration.

type Inspectable added in v0.0.43

type Inspectable interface {
	Inspect() Spec
}

Inspectable is an interface for components that can provide a structured specification describing their type, dependencies, and configuration. Implement this interface to enable introspection, visualization, or debugging of actor system components.

type Outbox added in v0.0.37

type Outbox[T any] struct {
	// contains filtered or unexported fields
}

Outbox provides a type-safe asynchronous broadcast mechanism. T is the message type emitted to all subscribers.

func (*Outbox[T]) Emit added in v0.0.37

func (o *Outbox[T]) Emit(ctx context.Context, message T)

Emit sends a message to all registered handlers. It is the caller's responsibility to ensure that handlers do not block indefinitely, as this will affect the responsiveness of the system. Handlers should ideally use non-blocking calls or manage their own goroutines if they need to perform longer work.

func (*Outbox[T]) Subscribe added in v0.0.37

func (o *Outbox[T]) Subscribe(handler func(context.Context, T))

Subscribe registers a new handler. Handlers should ideally be non-blocking (like an Inbox.Cast) to keep the system responsive.

type PeriodicSignal added in v0.0.42

type PeriodicSignal[V any] struct {
	*BaseSignal[V]
	// contains filtered or unexported fields
}

PeriodicSignal represents a value that is periodically updated by a function and can be subscribed to for updates.

func NewPeriodicSignal added in v0.0.42

func NewPeriodicSignal[V any](name string, update func(context.Context) (V, error), interval time.Duration) *PeriodicSignal[V]

NewPeriodicSignal creates a new PeriodicSignal with the given name and update function.

func (*PeriodicSignal[V]) Inspect added in v0.0.43

func (s *PeriodicSignal[V]) Inspect() Spec

Inspect returns the specification.

func (*PeriodicSignal[V]) Run added in v0.0.42

func (s *PeriodicSignal[V]) Run(ctx context.Context) error

Run starts the Signal's update loop, which periodically calls the update function to refresh the Signal's value and notifies subscribers of any changes. The loop continues until the provided context is canceled, at which point it will clean up all subscriber channels.

func (*PeriodicSignal[V]) Trigger added in v0.0.42

func (s *PeriodicSignal[V]) Trigger()

Trigger requests an immediate refresh of the signal value.

type PushedSignal added in v0.0.40

type PushedSignal[V any] struct {
	*BaseSignal[V]
	// contains filtered or unexported fields
}

PushedSignal represents a value that can be updated by a function and subscribed to for updates.

func NewPushedSignal added in v0.0.40

func NewPushedSignal[V any](name string, update func(context.Context, V) error) *PushedSignal[V]

NewPushedSignal creates a new PushedSignal with the given name and update function.

func (*PushedSignal[V]) Inspect added in v0.0.43

func (s *PushedSignal[V]) Inspect() Spec

Inspect returns the specification.

func (*PushedSignal[V]) Run added in v0.0.40

func (s *PushedSignal[V]) Run(ctx context.Context) error

Run starts the PushedSignal's main loop, which waits for the context to be canceled. When the context is canceled, it cleans up all subscriber channels. This method should be run in a separate goroutine.

func (*PushedSignal[V]) Write added in v0.0.40

func (s *PushedSignal[V]) Write(ctx context.Context, value V) error

Write updates the PushedSignal's value by calling the update function with the provided value. If the update is successful, it notifies all subscribers of the new value. It acquires a lock to ensure thread-safe updates to the value.

type ReadableSignal added in v0.0.40

type ReadableSignal[V any] interface {
	ReaderSignal[V]
	SubscriberSignal[V]
	WatcherSignal
}

ReadableSignal represents a value that can be read and subscribed to for updates. It also supports watching for changes. ReadableSignal is a sup.Actor, so it can be run as a goroutine and can be stopped by canceling its context.

type ReaderSignal added in v0.0.40

type ReaderSignal[V any] interface {
	Signal
	Read() V
}

ReaderSignal represents a value that can be read. The Read method returns the current value.

type RepliableRequest added in v0.0.9

type RepliableRequest[R any] interface {
	Reply(value R, err error)
}

RepliableRequest represents a request that can be replied to.

type RestartPolicy

type RestartPolicy uint8
const (
	Permanent RestartPolicy = iota // Always restart, even on clean exits
	Transient                      // Restart on errors/panics, but not on clean exits (nil)
	Temporary                      // Never restart
)

type Router added in v0.0.39

type Router[F any] struct {
	// contains filtered or unexported fields
}

Router is a generic type that manages a set of routees and provides methods to select one based on the configured strategy. It also supports broadcasting messages to all routees and retrying operations with a limit.

func NewRouter added in v0.0.39

func NewRouter[F any](strategy RouterStrategy, routees ...F) *Router[F]

NewRouter creates a new Router with the specified routing strategy and routees. It panics if no routees are provided or if an unknown strategy is specified.

func (*Router[F]) Broadcast added in v0.0.39

func (r *Router[F]) Broadcast(fn func(F))

Broadcast applies the provided function to all routees sequentially. This is useful for sending the same message or performing the same action on all routees.

func (*Router[F]) FanOut added in v0.0.39

func (r *Router[F]) FanOut(fn func(F))

FanOut applies the provided function to all routees concurrently. This is useful for performing actions on all routees in parallel, but it does not wait for the operations to complete.

func (*Router[F]) FanOutWait added in v0.0.39

func (r *Router[F]) FanOutWait(fn func(F))

FanOutWait applies the provided function to all routees concurrently and waits for all operations to complete before returning. This is useful when you need to ensure that all routees have processed the message or action before proceeding.

func (*Router[F]) Len added in v0.0.39

func (r *Router[F]) Len() int

Len returns the number of routees managed by the router.

func (*Router[F]) Next added in v0.0.39

func (r *Router[F]) Next() F

Next returns the next routee based on the configured routing strategy.

func (*Router[F]) Retry added in v0.0.39

func (r *Router[F]) Retry(limit int, run func(F) error) error

Retry attempts to execute the provided function with a routee up to the specified limit. If the function returns an error, it will retry with the next routee until the limit is reached. It returns the last error encountered if all attempts fail.

func (*Router[F]) Sticky added in v0.0.39

func (r *Router[F]) Sticky(key uint64) F

Sticky returns a routee based on the provided key, ensuring that the same key always maps to the same routee. This is useful for scenarios where you want to maintain affinity between certain messages and specific routees.

type RouterStrategy added in v0.0.39

type RouterStrategy uint8

Router provides various strategies for routing messages to a set of routees (e.g., actors, workers, etc.). It supports round-robin and random routing, as well as sticky routing based on a key. Additionally, it offers methods for broadcasting messages to all routees and retrying operations with a specified limit.

const (
	RoundRobin RouterStrategy = iota
	Random
)

type Signal added in v0.0.43

type Signal interface {
	Actor
	Inspectable
}

Signal represents a stream of values that can be observed, transformed, debounced, or combined within the actor system.

Signals implement the Actor interface and serve as a foundational primitive for reactive data flows, enabling declarative composition of asynchronous value streams across distributed actors.

type Spec added in v0.0.43

type Spec struct {
	Kind         string            `json:"kind"`
	Dependencies []string          `json:"dependencies"`
	Metadata     map[string]string `json:"metadata"`
}

Spec represents a structured description of an inspectable component. It captures the component's kind, its dependency names, and arbitrary configuration metadata for visualization and debugging purposes.

type SubscriberSignal added in v0.0.40

type SubscriberSignal[V any] interface {
	Signal
	Subscribe(context.Context) <-chan V
}

SubscriberSignal represents a value that can be subscribed to for updates. The Subscribe method returns a channel that will receive the updated value whenever it changes. The channel will be closed when the context is canceled.

type Supervisor

type Supervisor struct {
	*BaseActor
	// contains filtered or unexported fields
}

Supervisor manages the lifecycle of actor Run loops.

func NewSupervisor added in v0.0.7

func NewSupervisor(name string, opts ...SupervisorOption) *Supervisor

NewSupervisor creates a new Supervisor with the given options. Panics if the provided options are invalid.

func (*Supervisor) Children added in v0.0.43

func (s *Supervisor) Children() []Actor

func (*Supervisor) Inspect added in v0.0.43

func (s *Supervisor) Inspect() Spec

Inspect returns the specification.

func (*Supervisor) Run added in v0.0.14

func (s *Supervisor) Run(ctx context.Context) error

Run starts all actors under supervision and blocks until the context is canceled or all actors have stopped.

func (*Supervisor) Spawn added in v0.0.14

func (s *Supervisor) Spawn(ctx context.Context, actor Actor)

Spawn starts the given actor under supervision. It will be restarted according to the supervisor's policy if it returns an error or panics.

func (*Supervisor) Wait

func (s *Supervisor) Wait()

Wait blocks until all supervised actors have stopped.

type SupervisorObserver added in v0.0.28

type SupervisorObserver struct {
	OnActorRegistered    func(actor Actor)
	OnActorStarted       func(actor Actor)
	OnActorStopped       func(actor Actor, err error)
	OnActorRestarting    func(actor Actor, restartCount int, lastErr error)
	OnSupervisorTerminal func(err error)
}

SupervisorObserver allows observing lifecycle events of supervised actors and the supervisor itself. This can be used for logging, monitoring, or triggering side effects based on actor behavior.

type SupervisorOption added in v0.0.7

type SupervisorOption func(*Supervisor)

SupervisorOption configures a Supervisor.

func WithActor added in v0.0.14

func WithActor(actor Actor) SupervisorOption

WithActor adds an actor to be supervised. Can be called multiple times to add multiple actors.

func WithActors added in v0.0.14

func WithActors(actors ...Actor) SupervisorOption

WithActors adds multiple actors to be supervised.

func WithLogger added in v0.0.33

func WithLogger(logger *slog.Logger) SupervisorOption

WithLogger sets a logger for the supervisor.

func WithObserver added in v0.0.28

func WithObserver(observer *SupervisorObserver) SupervisorOption

WithObserver sets a SupervisorObserver to receive lifecycle event notifications for supervised actors and the supervisor itself. This allows external monitoring of actor behavior and supervisor actions.

func WithOnError added in v0.0.7

func WithOnError(handler func(actor Actor, err error)) SupervisorOption

WithOnError sets a callback function that will be called whenever a supervised actor returns an error or panics. The callback receives the actor and the error as arguments.

func WithPolicy added in v0.0.7

func WithPolicy(policy RestartPolicy) SupervisorOption

WithPolicy sets the restart policy.

func WithRestartDelay added in v0.0.7

func WithRestartDelay(d time.Duration) SupervisorOption

WithRestartDelay sets the delay between restarts.

func WithRestartLimit added in v0.0.7

func WithRestartLimit(maxRestarts int, window time.Duration) SupervisorOption

WithRestartLimit sets the maximum number of restarts allowed within a window. Both maxRestarts and window must be positive; otherwise NewSupervisor panics.

type ThrottledSignal added in v0.0.40

type ThrottledSignal[V any] struct {
	*BaseSignal[V]
	// contains filtered or unexported fields
}

ThrottledSignal is a reactive value that limits the rate at which updates from its source are broadcast. It ensures that updates are sent at most once per interval, always emitting the most recent (trailing) value from that interval.

func NewThrottledSignal added in v0.0.40

func NewThrottledSignal[V any](name string, src ReadableSignal[V], interval time.Duration) *ThrottledSignal[V]

NewThrottledSignal creates a new ThrottledSignal actor attached to a source provider.

func (*ThrottledSignal[V]) Inspect added in v0.0.43

func (s *ThrottledSignal[V]) Inspect() Spec

Inspect returns the specification.

func (*ThrottledSignal[V]) Run added in v0.0.40

func (s *ThrottledSignal[V]) Run(ctx context.Context) error

Run is the main actor loop. It manages the trailing-edge rate limit.

type WatcherSignal added in v0.0.40

type WatcherSignal interface {
	Signal
	Watch(ctx context.Context) <-chan struct{}
}

WatcherSignal represents a value that can be watched for changes. The Watch method returns a channel that will receive a notification whenever the value changes. The channel will be closed when the context is canceled.

type WritableSignal added in v0.0.40

type WritableSignal[V any] interface {
	ReadableSignal[V]
	WriterSignal[V]
}

WritableSignal represents a value that can be read, updated, and subscribed to for updates. It also supports watching for changes. WritableSignal is a sup.Actor, so it can be run as a goroutine and can be stopped by canceling its context.

type WriterSignal added in v0.0.40

type WriterSignal[V any] interface {
	Signal
	Write(context.Context, V) error
}

WriterSignal represents a value that can be updated by writing to it. The Write method may return an error if the update is rejected.

Directories

Path Synopsis
examples
dashboard command
simple command
mesh module
modbus module
ui
ws module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL