runtime

package
v0.0.0-...-23c868e Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 27, 2026 License: MIT Imports: 15 Imported by: 4

Documentation

Overview

Package runtime provides runnable lifecycle management and in-process pub/sub wiring for DBSP producers, consumers, and processors.

Index

Constants

View Source
const (
	// EventBufferSize is the default buffer for runtime event channels.
	EventBufferSize = 128
)

Variables

View Source
var (
	// ErrChannelFull indicates that a non-blocking channel write could not proceed.
	ErrChannelFull = errors.New("runtime channel full")
	// ErrChannelClosed indicates that a channel was closed while publishing.
	ErrChannelClosed = errors.New("runtime channel closed")
)

Functions

func ApplyLogBuffer

func ApplyLogBuffer(eventType, componentType, component, direction, topic, logical, pk string, weight zset.Weight, extra ...any) []any

ApplyLogBuffer builds a structured consumer apply payload for a single row.

func EventDocsLogBuffer

func EventDocsLogBuffer(eventType, componentType, component, direction, topic, logical string, data zset.ZSet, extra ...any) []any

EventDocsLogBuffer builds a structured runtime flow payload with full docs.

func EventLogBuffer

func EventLogBuffer(eventType, componentType, component, direction, topic, logical string, data zset.ZSet, extra ...any) []any

EventLogBuffer builds a structured runtime flow log payload.

func LogFlowApply

func LogFlowApply(log logr.Logger, eventType, componentType, component, direction, topic, logical, pk string, weight zset.Weight, docSource any, extra ...any)

LogFlowApply emits a single exclusive apply log line: - V(2): producer/consumer "dbsp runtime apply doc" - V(1): producer/consumer "dbsp runtime apply" - V(0): consumer "dbsp runtime apply"

func LogFlowEvent

func LogFlowEvent(log logr.Logger, eventType, componentType, component, direction, topic, logical string, data zset.ZSet, docsOverride []string, extra ...any)

LogFlowEvent emits a single exclusive runtime log line: - V(2): producer/consumer/processor "dbsp runtime event docs" - V(1): producer/consumer/processor "dbsp runtime event" - V(0): consumer "dbsp runtime event"

func ZSetFullSummary

func ZSetFullSummary(z zset.ZSet) []string

ZSetFullSummary returns a stable full entry list with docs and weights.

func ZSetPKSummary

func ZSetPKSummary(z zset.ZSet) []string

ZSetPKSummary returns a stable primary-key summary (with weights) for logging.

Types

type BaseComponent

type BaseComponent struct {
	ErrorReporter
	logr.Logger
	// contains filtered or unexported fields
}

BaseComponent carries shared identity, logger, and error reporting.

func (*BaseComponent) HandleError

func (c *BaseComponent) HandleError(err error)

HandleError reports err as a non-critical error from this component.

func (*BaseComponent) Name

func (c *BaseComponent) Name() string

Name returns the component name.

type BaseConsumer

type BaseConsumer struct {
	*BaseComponent
	Subscriber
	// contains filtered or unexported fields
}

BaseConsumer provides a reusable subscriber event loop.

func NewBaseConsumer

func NewBaseConsumer(cfg BaseConsumerConfig) (*BaseConsumer, error)

NewBaseConsumer creates a BaseConsumer and subscribes it to all configured topics.

func (*BaseConsumer) MarshalJSON

func (c *BaseConsumer) MarshalJSON() ([]byte, error)

MarshalJSON provides a stable machine-readable representation.

func (*BaseConsumer) Run

Run receives events from the embedded subscriber and calls h.Consume. Handler errors are non-critical and are reported through ErrorReporter.

func (*BaseConsumer) String

func (c *BaseConsumer) String() string

String implements fmt.Stringer.

type BaseConsumerConfig

type BaseConsumerConfig struct {
	Name string

	Subscriber
	ErrorReporter
	logr.Logger

	Topics []string
}

BaseConsumerConfig configures a BaseConsumer.

type BaseProcessor

type BaseProcessor struct {
	*BaseComponent
	Publisher
	Subscriber
	// contains filtered or unexported fields
}

BaseProcessor provides a reusable processor event loop.

func NewBaseProcessor

func NewBaseProcessor(cfg BaseProcessorConfig) (*BaseProcessor, error)

NewBaseProcessor creates a BaseProcessor and subscribes it to all configured topics.

func (*BaseProcessor) MarshalJSON

func (p *BaseProcessor) MarshalJSON() ([]byte, error)

MarshalJSON provides a stable machine-readable representation.

func (*BaseProcessor) Run

Run receives events from the embedded subscriber and calls h.Consume. Handler errors are non-critical and are reported through ErrorReporter.

func (*BaseProcessor) String

func (p *BaseProcessor) String() string

String implements fmt.Stringer.

type BaseProcessorConfig

type BaseProcessorConfig struct {
	Name string

	Publisher
	Subscriber
	ErrorReporter
	logr.Logger

	Topics []string
}

BaseProcessorConfig configures a BaseProcessor.

type BaseProducer

type BaseProducer struct {
	*BaseComponent
	Publisher
	// contains filtered or unexported fields
}

BaseProducer provides shared identity and error reporting for producers.

func NewBaseProducer

func NewBaseProducer(cfg BaseProducerConfig) (*BaseProducer, error)

NewBaseProducer creates a BaseProducer.

func (*BaseProducer) MarshalJSON

func (p *BaseProducer) MarshalJSON() ([]byte, error)

MarshalJSON provides a stable machine-readable representation.

func (*BaseProducer) String

func (p *BaseProducer) String() string

String implements fmt.Stringer.

type BaseProducerConfig

type BaseProducerConfig struct {
	Name string

	Publisher
	ErrorReporter
	logr.Logger

	Topics []string
}

BaseProducerConfig configures a BaseProducer.

type Circuit

type Circuit struct {
	Publisher
	Subscriber
	// contains filtered or unexported fields
}

Circuit is a runtime processor that subscribes to all query inputs and publishes all query outputs.

func NewCircuit

func NewCircuit(name string, rt *Runtime, q *compiler.Query, logger logr.Logger) (*Circuit, error)

NewCircuit builds a runtime processor from a compiled query. name is a unique identifier for this circuit within the runtime; it is used as the origin field in any ComponentErrors reported by the circuit. Name uniqueness is enforced when the circuit is passed to Runtime.Add.

func (*Circuit) Execute

func (c *Circuit) Execute(in Event) ([]Event, error)

Execute applies one runtime event to the compiled circuit.

func (*Circuit) MarshalJSON

func (c *Circuit) MarshalJSON() ([]byte, error)

MarshalJSON provides a stable machine-readable representation.

func (*Circuit) Name

func (c *Circuit) Name() string

Name returns the circuit's unique component name.

func (*Circuit) Reset

func (c *Circuit) Reset()

Reset clears executor and cached snapshot input state.

func (*Circuit) SetDocsFormatter

func (c *Circuit) SetDocsFormatter(f func(Event) []string)

SetDocsFormatter overrides full-doc flow logging payloads.

func (*Circuit) SetObserver

func (c *Circuit) SetObserver(observer executor.ObserverFunc)

SetObserver installs an optional executor observer callback.

func (*Circuit) Start

func (c *Circuit) Start(ctx context.Context) error

Start subscribes to all query inputs and forwards outputs via Publisher. Execute and publish errors are non-critical: they are reported via the runtime error channel and the circuit continues processing subsequent events. Start only returns a non-nil error on context cancellation-related issues.

func (*Circuit) String

func (c *Circuit) String() string

String implements fmt.Stringer.

type ConsumeHandler

type ConsumeHandler interface {
	Consume(ctx context.Context, event Event) error
}

ConsumeHandler handles one runtime event.

type Consumer

type Consumer interface {
	Runnable
	Subscriber
	fmt.Stringer
	json.Marshaler
}

Consumer is a runnable event sink.

Implementations typically embed a Subscriber created from Runtime.NewSubscriber. A Subscriber has one channel and can subscribe that channel to multiple topics.

type Error

type Error struct {
	// Origin identifies the component that reported the error, e.g.
	// "circuit/Foo→Bar" or "kubernetes-consumer/my-svc".
	Origin string
	Err    error
}

Error is a non-critical runtime error with component origin context. It is sent on the Runtime error channel and allows callers to observe transient errors from individual components without stopping the runtime.

func (Error) Error

func (e Error) Error() string

func (Error) Unwrap

func (e Error) Unwrap() error

type ErrorReporter

type ErrorReporter interface {
	ReportError(origin string, err error)
}

ErrorReporter is the shared non-critical error reporting contract. Runtime implements this interface.

type ErrorReporterFunc

type ErrorReporterFunc func(origin string, err error)

ErrorReporterFunc adapts a function to ErrorReporter.

func (ErrorReporterFunc) ReportError

func (f ErrorReporterFunc) ReportError(origin string, err error)

ReportError calls f(origin, err).

type Event

type Event struct {
	Name string
	Data zset.ZSet
}

Event is a named payload sent through runtime endpoints.

type Manager

type Manager interface {
	Add(Runnable) error
	Stop(Runnable)
	Start(ctx context.Context) error
}

Manager controls runnable lifecycles, including dynamic add/remove.

func NewManager

func NewManager() Manager

NewManager creates a lifecycle manager that supports dynamic Add and Stop.

type Processor

type Processor interface {
	Producer
	Consumer
}

Processor is both a Producer and a Consumer.

type Producer

type Producer interface {
	Runnable
	Publisher
	fmt.Stringer
	json.Marshaler
}

Producer is a runnable event source.

Implementations typically embed a Publisher created from Runtime.NewPublisher.

type PubSub

type PubSub struct {
	// contains filtered or unexported fields
}

PubSub is a topic-indexed subscription registry.

func NewPubSub

func NewPubSub() *PubSub

func (*PubSub) NewPublisher

func (ps *PubSub) NewPublisher() *publisher

NewPublisher creates a publisher bound to this PubSub.

func (*PubSub) NewSubscriber

func (ps *PubSub) NewSubscriber() *subscriber

NewSubscriber creates a single-channel subscriber bound to this PubSub.

type PublishFunc

type PublishFunc func(Event) error

PublishFunc adapts a function to Publisher.

func (PublishFunc) Publish

func (f PublishFunc) Publish(event Event) error

Publish calls f(event).

type Publisher

type Publisher interface {
	Publish(event Event) error
}

Publisher emits runtime events.

type Runnable

type Runnable interface {
	Name() string
	Start(ctx context.Context) error
}

Runnable has a context-managed lifecycle.

type Runtime

type Runtime struct {
	*PubSub
	Manager
	// contains filtered or unexported fields
}

Runtime combines a shared PubSub and a runnable Manager.

func NewRuntime

func NewRuntime(log logr.Logger) *Runtime

NewRuntime creates a Runtime. log is used as a fallback sink for non-critical errors when no error channel has been set via SetErrorChannel.

func (*Runtime) Add

func (rt *Runtime) Add(r Runnable) error

Add registers r with the manager and starts it if the runtime is already running. If r implements Named, its name is registered for uniqueness enforcement; a duplicate or empty name causes Add to return an error and the component is not added.

func (*Runtime) ReportError

func (rt *Runtime) ReportError(name string, err error)

ReportError reports a non-critical error from the named component. If an error channel has been set, the error is sent non-blocking; a dropped send is itself logged. If no channel is set, the error is logged via rt.log.

func (*Runtime) SetCircuitObserver

func (rt *Runtime) SetCircuitObserver(name string, observer executor.ObserverFunc) bool

SetCircuitObserver installs or clears an observer on a runtime circuit by name. It returns true if a runnable with the given name exists and supports observers.

func (*Runtime) SetErrorChannel

func (rt *Runtime) SetErrorChannel(ch chan<- Error)

SetErrorChannel wires a channel that receives non-critical ComponentErrors from all components registered with this runtime. Must be called before Start. The channel should be buffered; a full channel causes the error to be dropped and logged instead.

func (*Runtime) Stop

func (rt *Runtime) Stop(r Runnable)

Stop unregisters and stops a previously added runnable.

type Subscriber

type Subscriber interface {
	Subscribe(topic string)
	Unsubscribe(topic string)
	GetChannel() <-chan Event
}

Subscriber can consume events from topic channels.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL