Documentation
¶
Overview ¶
Package runtime provides runnable lifecycle management and in-process pub/sub wiring for DBSP producers, consumers, and processors.
Index ¶
- Constants
- Variables
- func ApplyLogBuffer(eventType, componentType, component, direction, topic, logical, pk string, ...) []any
- func EventDocsLogBuffer(eventType, componentType, component, direction, topic, logical string, ...) []any
- func EventLogBuffer(eventType, componentType, component, direction, topic, logical string, ...) []any
- func LogFlowApply(log logr.Logger, ...)
- func LogFlowEvent(log logr.Logger, ...)
- func ZSetFullSummary(z zset.ZSet) []string
- func ZSetPKSummary(z zset.ZSet) []string
- type BaseComponent
- type BaseConsumer
- type BaseConsumerConfig
- type BaseProcessor
- type BaseProcessorConfig
- type BaseProducer
- type BaseProducerConfig
- type Circuit
- func (c *Circuit) Execute(in Event) ([]Event, error)
- func (c *Circuit) MarshalJSON() ([]byte, error)
- func (c *Circuit) Name() string
- func (c *Circuit) Reset()
- func (c *Circuit) SetDocsFormatter(f func(Event) []string)
- func (c *Circuit) SetObserver(observer executor.ObserverFunc)
- func (c *Circuit) Start(ctx context.Context) error
- func (c *Circuit) String() string
- type ConsumeHandler
- type Consumer
- type Error
- type ErrorReporter
- type ErrorReporterFunc
- type Event
- type Manager
- type Processor
- type Producer
- type PubSub
- type PublishFunc
- type Publisher
- type Runnable
- type Runtime
- type Subscriber
Constants ¶
const (
// EventBufferSize is the default buffer for runtime event channels.
EventBufferSize = 128
)
Variables ¶
var ( // ErrChannelFull indicates that a non-blocking channel write could not proceed. ErrChannelFull = errors.New("runtime channel full") // ErrChannelClosed indicates that a channel was closed while publishing. ErrChannelClosed = errors.New("runtime channel closed") )
Functions ¶
func ApplyLogBuffer ¶
func ApplyLogBuffer(eventType, componentType, component, direction, topic, logical, pk string, weight zset.Weight, extra ...any) []any
ApplyLogBuffer builds a structured consumer apply payload for a single row.
func EventDocsLogBuffer ¶
func EventDocsLogBuffer(eventType, componentType, component, direction, topic, logical string, data zset.ZSet, extra ...any) []any
EventDocsLogBuffer builds a structured runtime flow payload with full docs.
func EventLogBuffer ¶
func EventLogBuffer(eventType, componentType, component, direction, topic, logical string, data zset.ZSet, extra ...any) []any
EventLogBuffer builds a structured runtime flow log payload.
func LogFlowApply ¶
func LogFlowApply(log logr.Logger, eventType, componentType, component, direction, topic, logical, pk string, weight zset.Weight, docSource any, extra ...any)
LogFlowApply emits a single exclusive apply log line: - V(2): producer/consumer "dbsp runtime apply doc" - V(1): producer/consumer "dbsp runtime apply" - V(0): consumer "dbsp runtime apply"
func LogFlowEvent ¶
func LogFlowEvent(log logr.Logger, eventType, componentType, component, direction, topic, logical string, data zset.ZSet, docsOverride []string, extra ...any)
LogFlowEvent emits a single exclusive runtime log line: - V(2): producer/consumer/processor "dbsp runtime event docs" - V(1): producer/consumer/processor "dbsp runtime event" - V(0): consumer "dbsp runtime event"
func ZSetFullSummary ¶
ZSetFullSummary returns a stable full entry list with docs and weights.
func ZSetPKSummary ¶
ZSetPKSummary returns a stable primary-key summary (with weights) for logging.
Types ¶
type BaseComponent ¶
type BaseComponent struct {
ErrorReporter
logr.Logger
// contains filtered or unexported fields
}
BaseComponent carries shared identity, logger, and error reporting.
func (*BaseComponent) HandleError ¶
func (c *BaseComponent) HandleError(err error)
HandleError reports err as a non-critical error from this component.
type BaseConsumer ¶
type BaseConsumer struct {
*BaseComponent
Subscriber
// contains filtered or unexported fields
}
BaseConsumer provides a reusable subscriber event loop.
func NewBaseConsumer ¶
func NewBaseConsumer(cfg BaseConsumerConfig) (*BaseConsumer, error)
NewBaseConsumer creates a BaseConsumer and subscribes it to all configured topics.
func (*BaseConsumer) MarshalJSON ¶
func (c *BaseConsumer) MarshalJSON() ([]byte, error)
MarshalJSON provides a stable machine-readable representation.
func (*BaseConsumer) Run ¶
func (c *BaseConsumer) Run(ctx context.Context, h ConsumeHandler) error
Run receives events from the embedded subscriber and calls h.Consume. Handler errors are non-critical and are reported through ErrorReporter.
func (*BaseConsumer) String ¶
func (c *BaseConsumer) String() string
String implements fmt.Stringer.
type BaseConsumerConfig ¶
type BaseConsumerConfig struct {
Name string
Subscriber
ErrorReporter
logr.Logger
Topics []string
}
BaseConsumerConfig configures a BaseConsumer.
type BaseProcessor ¶
type BaseProcessor struct {
*BaseComponent
Publisher
Subscriber
// contains filtered or unexported fields
}
BaseProcessor provides a reusable processor event loop.
func NewBaseProcessor ¶
func NewBaseProcessor(cfg BaseProcessorConfig) (*BaseProcessor, error)
NewBaseProcessor creates a BaseProcessor and subscribes it to all configured topics.
func (*BaseProcessor) MarshalJSON ¶
func (p *BaseProcessor) MarshalJSON() ([]byte, error)
MarshalJSON provides a stable machine-readable representation.
func (*BaseProcessor) Run ¶
func (p *BaseProcessor) Run(ctx context.Context, h ConsumeHandler) error
Run receives events from the embedded subscriber and calls h.Consume. Handler errors are non-critical and are reported through ErrorReporter.
func (*BaseProcessor) String ¶
func (p *BaseProcessor) String() string
String implements fmt.Stringer.
type BaseProcessorConfig ¶
type BaseProcessorConfig struct {
Name string
Publisher
Subscriber
ErrorReporter
logr.Logger
Topics []string
}
BaseProcessorConfig configures a BaseProcessor.
type BaseProducer ¶
type BaseProducer struct {
*BaseComponent
Publisher
// contains filtered or unexported fields
}
BaseProducer provides shared identity and error reporting for producers.
func NewBaseProducer ¶
func NewBaseProducer(cfg BaseProducerConfig) (*BaseProducer, error)
NewBaseProducer creates a BaseProducer.
func (*BaseProducer) MarshalJSON ¶
func (p *BaseProducer) MarshalJSON() ([]byte, error)
MarshalJSON provides a stable machine-readable representation.
func (*BaseProducer) String ¶
func (p *BaseProducer) String() string
String implements fmt.Stringer.
type BaseProducerConfig ¶
BaseProducerConfig configures a BaseProducer.
type Circuit ¶
type Circuit struct {
Publisher
Subscriber
// contains filtered or unexported fields
}
Circuit is a runtime processor that subscribes to all query inputs and publishes all query outputs.
func NewCircuit ¶
NewCircuit builds a runtime processor from a compiled query. name is a unique identifier for this circuit within the runtime; it is used as the origin field in any ComponentErrors reported by the circuit. Name uniqueness is enforced when the circuit is passed to Runtime.Add.
func (*Circuit) MarshalJSON ¶
MarshalJSON provides a stable machine-readable representation.
func (*Circuit) Reset ¶
func (c *Circuit) Reset()
Reset clears executor and cached snapshot input state.
func (*Circuit) SetDocsFormatter ¶
SetDocsFormatter overrides full-doc flow logging payloads.
func (*Circuit) SetObserver ¶
func (c *Circuit) SetObserver(observer executor.ObserverFunc)
SetObserver installs an optional executor observer callback.
func (*Circuit) Start ¶
Start subscribes to all query inputs and forwards outputs via Publisher. Execute and publish errors are non-critical: they are reported via the runtime error channel and the circuit continues processing subsequent events. Start only returns a non-nil error on context cancellation-related issues.
type ConsumeHandler ¶
ConsumeHandler handles one runtime event.
type Consumer ¶
Consumer is a runnable event sink.
Implementations typically embed a Subscriber created from Runtime.NewSubscriber. A Subscriber has one channel and can subscribe that channel to multiple topics.
type Error ¶
type Error struct {
// Origin identifies the component that reported the error, e.g.
// "circuit/Foo→Bar" or "kubernetes-consumer/my-svc".
Origin string
Err error
}
Error is a non-critical runtime error with component origin context. It is sent on the Runtime error channel and allows callers to observe transient errors from individual components without stopping the runtime.
type ErrorReporter ¶
ErrorReporter is the shared non-critical error reporting contract. Runtime implements this interface.
type ErrorReporterFunc ¶
ErrorReporterFunc adapts a function to ErrorReporter.
func (ErrorReporterFunc) ReportError ¶
func (f ErrorReporterFunc) ReportError(origin string, err error)
ReportError calls f(origin, err).
type Manager ¶
Manager controls runnable lifecycles, including dynamic add/remove.
func NewManager ¶
func NewManager() Manager
NewManager creates a lifecycle manager that supports dynamic Add and Stop.
type Producer ¶
Producer is a runnable event source.
Implementations typically embed a Publisher created from Runtime.NewPublisher.
type PubSub ¶
type PubSub struct {
// contains filtered or unexported fields
}
PubSub is a topic-indexed subscription registry.
func (*PubSub) NewPublisher ¶
func (ps *PubSub) NewPublisher() *publisher
NewPublisher creates a publisher bound to this PubSub.
func (*PubSub) NewSubscriber ¶
func (ps *PubSub) NewSubscriber() *subscriber
NewSubscriber creates a single-channel subscriber bound to this PubSub.
type PublishFunc ¶
PublishFunc adapts a function to Publisher.
func (PublishFunc) Publish ¶
func (f PublishFunc) Publish(event Event) error
Publish calls f(event).
type Runtime ¶
Runtime combines a shared PubSub and a runnable Manager.
func NewRuntime ¶
NewRuntime creates a Runtime. log is used as a fallback sink for non-critical errors when no error channel has been set via SetErrorChannel.
func (*Runtime) Add ¶
Add registers r with the manager and starts it if the runtime is already running. If r implements Named, its name is registered for uniqueness enforcement; a duplicate or empty name causes Add to return an error and the component is not added.
func (*Runtime) ReportError ¶
ReportError reports a non-critical error from the named component. If an error channel has been set, the error is sent non-blocking; a dropped send is itself logged. If no channel is set, the error is logged via rt.log.
func (*Runtime) SetCircuitObserver ¶
func (rt *Runtime) SetCircuitObserver(name string, observer executor.ObserverFunc) bool
SetCircuitObserver installs or clears an observer on a runtime circuit by name. It returns true if a runnable with the given name exists and supports observers.
func (*Runtime) SetErrorChannel ¶
SetErrorChannel wires a channel that receives non-critical ComponentErrors from all components registered with this runtime. Must be called before Start. The channel should be buffered; a full channel causes the error to be dropped and logged instead.
type Subscriber ¶
type Subscriber interface {
Subscribe(topic string)
Unsubscribe(topic string)
GetChannel() <-chan Event
}
Subscriber can consume events from topic channels.