batch

package
v3.57.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 14, 2021 License: MIT Imports: 5 Imported by: 0

Documentation

Overview

Package batch contains internal utilities for interacting with message batches.

Package batch contains internal utilities for interacting with message batches.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CollapsedCount added in v3.34.0

func CollapsedCount(p types.Part) int

CollapsedCount attempts to extract the actual number of messages that were collapsed into the resulting message part. This value could be greater than 1 when users configure processors that archive batched message parts.

func MessageCollapsedCount added in v3.34.0

func MessageCollapsedCount(m types.Message) int

MessageCollapsedCount attempts to extract the actual number of messages that were combined into the resulting batched message parts. This value could differ from message.Len() when users configure processors that archive batched message parts.

func WithCollapsedCount added in v3.34.0

func WithCollapsedCount(p types.Part, count int) types.Part

WithCollapsedCount returns a message part with a context indicating that this message is the result of collapsing a number of messages. This allows downstream components to know how many total messages were combined.

Types

type AckFunc added in v3.53.0

type AckFunc func(context.Context, error) error

AckFunc is a common function signature for acknowledging receipt of messages.

type CombinedAcker added in v3.53.0

type CombinedAcker struct {
	// contains filtered or unexported fields
}

CombinedAcker creates a single ack func closure that aggregates one or more derived closures such that only once each derived closure is called the singular ack func will trigger. If at least one derived closure receives an error the singular ack func will send the first non-nil error received.

func NewCombinedAcker added in v3.53.0

func NewCombinedAcker(aFn AckFunc) *CombinedAcker

NewCombinedAcker creates an aggregated that derives one or more ack funcs that, once all of which have been called, the provided root ack func is called.

func (*CombinedAcker) Derive added in v3.53.0

func (c *CombinedAcker) Derive() AckFunc

Derive creates a new ack func that must be called before the origin ack func will be called. It is invalid to derive an ack func after any other previously derived funcs have been called.

type Error

type Error struct {
	// contains filtered or unexported fields
}

Error is an error type that also allows storing granular errors for each message of a batch.

func NewError

func NewError(msg types.Message, err error) *Error

NewError creates a new batch-wide error, where it's possible to add granular errors for individual messages of the batch.

func (*Error) Error

func (e *Error) Error() string

Error implements the common error interface.

func (*Error) Failed

func (e *Error) Failed(i int, err error) *Error

Failed stores an error state for a particular message of a batch. Returns a pointer to the underlying error, allowing with method to be chained.

If Failed is not called then all messages are assumed to have failed. If it is called at least once then all message indexes that aren't explicitly failed are assumed to have been processed successfully.

func (*Error) IndexedErrors

func (e *Error) IndexedErrors() int

IndexedErrors returns the number of indexed errors that have been registered for the batch.

func (*Error) Unwrap

func (e *Error) Unwrap() error

Unwrap returns the underlying common error.

func (*Error) WalkParts

func (e *Error) WalkParts(fn func(int, types.Part, error) bool)

WalkParts applies a closure to each message that was part of the request that caused this error. The closure is provided the message part index, a pointer to the part, and its individual error, which may be nil if the message itself was processed successfully. The closure returns a bool which indicates whether the iteration should be continued.

type WalkableError added in v3.41.0

type WalkableError interface {
	WalkParts(fn func(int, types.Part, error) bool)
	IndexedErrors() int
	error
}

WalkableError is an interface implemented by batch errors that allows you to walk the messages of the batch and dig into the individual errors.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL