processor

package
v0.6.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 1, 2026 License: MIT Imports: 9 Imported by: 0

Documentation

Overview

Package processor coordinates command routing and execution lifecycle for Asynx.

Processor[T] routes incoming commands to shards via consistent hashing, manages graceful shutdown, and exposes Send/SendWait/Shutdown interfaces.

  • router — FNV-1a hash-based consistent shard selection
  • pool — Shard-based worker pool for concurrent command execution
  • executor — Passed to pool; executes Load->Validate->Write->Dispatch pipeline

All command execution is non-blocking via channels. Send and SendWait block until either the command completes, context cancels, or the queue is full. Send dispatches events asynchronously; SendWait dispatches synchronously. Shutdown drains in-flight work, closes the dispatcher, then closes the bus.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Processor

type Processor[T any] struct {
	// contains filtered or unexported fields
}

func New

func New[T any](
	es *eventstore.EventStore[T],
	bus asynxmd.Bus[T],
	opts ...ProcessorOpt[T],
) *Processor[T]

func (*Processor[T]) Send

func (p *Processor[T]) Send(
	ctx context.Context,
	cmd asynxmd.Command[T],
) (asynxmd.Event[T], error)

func (*Processor[T]) SendWait added in v0.3.0

func (p *Processor[T]) SendWait(
	ctx context.Context,
	cmd asynxmd.Command[T],
) (asynxmd.Event[T], error)

func (*Processor[T]) SetOnSendPending

func (p *Processor[T]) SetOnSendPending(fn func())

ForTesting: SetOnSendPending sets a callback invoked after a command is enqueued but before Send or SendWait blocks waiting for its result. Do not call in production code.

func (*Processor[T]) Shutdown

func (p *Processor[T]) Shutdown(ctx context.Context) error

func (*Processor[T]) WaitPublish

func (p *Processor[T]) WaitPublish()

ForTesting: WaitPublish blocks until all dispatched events have been delivered. Do not call in production code.

type ProcessorOpt

type ProcessorOpt[T any] func(*processorConfig[T])

func WithPublishErrorHandler added in v0.3.1

func WithPublishErrorHandler[T any](fn asynxmd.PublishErrorHandler[T]) ProcessorOpt[T]

WithPublishErrorHandler sets a callback invoked when Bus.PublishSync returns a non-nil error inside the dispatcher. When not set, publish errors are silently dropped.

func WithQueueDepth

func WithQueueDepth[T any](depth int) ProcessorOpt[T]

func WithShards

func WithShards[T any](count int) ProcessorOpt[T]

func WithWorkersPerShard

func WithWorkersPerShard[T any](count int) ProcessorOpt[T]

Directories

Path Synopsis
Package exec implements the command execution pipeline for Asynx.
Package exec implements the command execution pipeline for Asynx.
Package models defines data structures shared across processor sub-packages.
Package models defines data structures shared across processor sub-packages.
Package pool implements shard-based concurrent command execution.
Package pool implements shard-based concurrent command execution.
Package queue implements consistent shard routing for Asynx commands.
Package queue implements consistent shard routing for Asynx commands.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL