handler

package
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 26, 2024 License: MIT Imports: 13 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Acknowledger

type Acknowledger interface {
	// Ack acknowledges the message, ensuring it is not handled again.
	//
	// b is the batch from the unit-of-work.
	Ack(ctx context.Context, b persistence.Batch) (persistence.Result, error)

	// Nack negatively-acknowledges the message, causing it to be retried.
	Nack(ctx context.Context, cause error) error
}

Acknowledger is an interface for acknowledging handled messages.

type DeferFunc

type DeferFunc func(Result, error)

DeferFunc is a function can be deferred until a unit-of-work is completed.

type EntryPoint

type EntryPoint struct {
	// QueueEvents is the set of event types that must be added to the queue.
	QueueEvents message.TypeCollection

	// Handler is the handler implmentation that populates the unit-of-work.
	Handler Handler

	// OnSuccess is called for each unit-of-work that is persisted successfully.
	//
	// It is invoked before the unit-of-work's deferred functions are executed.
	OnSuccess func(Result)
}

EntryPoint sets up a unit-of-work for each message to be handled, dispatches to a handler, and persists the result.

func (*EntryPoint) HandleMessage

func (ep *EntryPoint) HandleMessage(
	ctx context.Context,
	a Acknowledger,
	p parcel.Parcel,
) error

HandleMessage handles the message in p using ep.Handler and persists the result of its unit-of-work.

b is a batch of persistence operations that must be performed atomically with the unit-of-work.

type Handler

type Handler interface {
	// HandleMessage handles the message in p.
	HandleMessage(ctx context.Context, w UnitOfWork, p parcel.Parcel) error
}

Handler is an interface for handling messages.

type QueueConsumer

type QueueConsumer struct {
	// Queue is the message queue to consume.
	Queue *queue.Queue

	// EntryPoint is the handler entry-point used to dispatch the message.
	EntryPoint *EntryPoint

	// Persister is the persister used to persist the units-of-work produced
	// by the entry-point, and to update the queued messages.
	Persister persistence.Persister

	// BackoffStrategy is the strategy used to delay retrying a message after a
	// failure. If it is nil, backoff.DefaultStrategy is used.
	BackoffStrategy backoff.Strategy

	// Semaphore is used to limit the number of messages being handled
	// concurrently.
	Semaphore *semaphore.Weighted

	// Logger is the target for log messages about the consumed messages.
	// If it is nil, logging.DefaultLogger is used.
	Logger logging.Logger
}

QueueConsumer consumes messages from a queue and handles them.

func (*QueueConsumer) Run

func (c *QueueConsumer) Run(ctx context.Context) error

Run consumes messages from the queue

type Result

type Result struct {
	// Queued is the set of messages that were placed on the message queue,
	// which may include events.
	Queued []queue.Message

	// Events is the set of events that were recorded in the unit-of-work.
	Events []eventstream.Event
}

Result is the result of a successful unit-of-work.

type Router

type Router map[message.Type][]Handler

Router is a handler that dispatches to other handlers based on message type.

func (Router) HandleMessage

func (r Router) HandleMessage(
	ctx context.Context,
	w UnitOfWork,
	p parcel.Parcel,
) error

HandleMessage handles the message in p.

type UnitOfWork

type UnitOfWork interface {
	// ExecuteCommand updates the unit-of-work to execute the command in p.
	ExecuteCommand(p parcel.Parcel)

	// ScheduleTimeout updates the unit-of-work to schedule the timeout in p.
	ScheduleTimeout(p parcel.Parcel)

	// RecordEvent updates the unit-of-work to record the event in p.
	RecordEvent(p parcel.Parcel)

	// Do updates the unit-of-work to include op in the persistence batch.
	Do(op persistence.Operation)

	// Defer registers fn to be called when the unit-of-work is complete.
	//
	// Like Go's defer keyword, deferred functions guaranteed to be invoked in
	// the reverse order to which they are registered.
	Defer(fn DeferFunc)
}

UnitOfWork encapsulates the state changes made by one or more handlers in the process of handling a single message.

Directories

Path Synopsis
Package aggregate provides an adaptor that exposes Dogma aggregate message handlers as Verity handlers.
Package aggregate provides an adaptor that exposes Dogma aggregate message handlers as Verity handlers.
Package cache provides an in-memory cache for aggregate and process instances.
Package cache provides an in-memory cache for aggregate and process instances.
Package integration provides an adaptor that exposes Dogma integration message handlers as Verity handlers.
Package integration provides an adaptor that exposes Dogma integration message handlers as Verity handlers.
Package process provides an adaptor that exposes Dogma process message handlers as Verity handlers.
Package process provides an adaptor that exposes Dogma process message handlers as Verity handlers.
Package projection provides an adaptor that exposes Dogma projection message handlers as Verity event stream handlers.
Package projection provides an adaptor that exposes Dogma projection message handlers as Verity event stream handlers.
resource
Package resource contains utilities for performing low-level manipulations of projection resource versions.
Package resource contains utilities for performing low-level manipulations of projection resource versions.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL