Documentation
¶
Overview ¶
Package processor coordinates command routing and execution lifecycle for Asynx.
Processor[T] routes incoming commands to shards via consistent hashing, manages graceful shutdown, and exposes Send/SendWait/Shutdown interfaces.
- router — FNV-1a hash-based consistent shard selection
- pool — Shard-based worker pool for concurrent command execution
- executor — Passed to pool; executes Load->Validate->Write->Dispatch pipeline
All command execution is non-blocking via channels. Send and SendWait block until either the command completes, context cancels, or the queue is full. Send dispatches events asynchronously; SendWait dispatches synchronously. Shutdown drains in-flight work, closes the dispatcher, then closes the bus.
Index ¶
- type Processor
- func (p *Processor[T]) Send(ctx context.Context, cmd asynxmd.Command[T]) (asynxmd.Event[T], error)
- func (p *Processor[T]) SendWait(ctx context.Context, cmd asynxmd.Command[T]) (asynxmd.Event[T], error)
- func (p *Processor[T]) SetOnSendPending(fn func())
- func (p *Processor[T]) Shutdown(ctx context.Context) error
- func (p *Processor[T]) WaitPublish()
- type ProcessorOpt
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Processor ¶
type Processor[T any] struct { // contains filtered or unexported fields }
func New ¶
func New[T any]( es *eventstore.EventStore[T], bus asynxmd.Bus[T], opts ...ProcessorOpt[T], ) *Processor[T]
func (*Processor[T]) SetOnSendPending ¶
func (p *Processor[T]) SetOnSendPending(fn func())
ForTesting: SetOnSendPending sets a callback invoked after a command is enqueued but before Send or SendWait blocks waiting for its result. Do not call in production code.
func (*Processor[T]) WaitPublish ¶
func (p *Processor[T]) WaitPublish()
ForTesting: WaitPublish blocks until all dispatched events have been delivered. Do not call in production code.
type ProcessorOpt ¶
type ProcessorOpt[T any] func(*processorConfig[T])
func WithPublishErrorHandler ¶ added in v0.3.1
func WithPublishErrorHandler[T any](fn asynxmd.PublishErrorHandler[T]) ProcessorOpt[T]
WithPublishErrorHandler sets a callback invoked when Bus.PublishSync returns a non-nil error inside the dispatcher. When not set, publish errors are silently dropped.
func WithQueueDepth ¶
func WithQueueDepth[T any](depth int) ProcessorOpt[T]
func WithShards ¶
func WithShards[T any](count int) ProcessorOpt[T]
func WithWorkersPerShard ¶
func WithWorkersPerShard[T any](count int) ProcessorOpt[T]
Directories
¶
| Path | Synopsis |
|---|---|
|
Package exec implements the command execution pipeline for Asynx.
|
Package exec implements the command execution pipeline for Asynx. |
|
Package models defines data structures shared across processor sub-packages.
|
Package models defines data structures shared across processor sub-packages. |
|
Package pool implements shard-based concurrent command execution.
|
Package pool implements shard-based concurrent command execution. |
|
Package queue implements consistent shard routing for Asynx commands.
|
Package queue implements consistent shard routing for Asynx commands. |