drain

package
v0.0.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 4, 2023 License: MIT, Apache-2.0 Imports: 17 Imported by: 0

Documentation

Overview

Package drain implements a composable message distribution package for Go.

This package is a forked and altered version of the original package from the Docker project, which can be found at: https://github.com/docker/go-events

Index

Constants

This section is empty.

Variables

View Source
var (
	// JSONMarshaller a simple JSON marshaller function that uses the standard
	// library `encoding/json` package to marshal events.Event messages.
	JSONMarshaller = func(m events.Event) ([]byte, error) {
		return json.Marshal(m)
	}

	// ProtoMarshaller assumes the input message as `proto.Message`, and marshall
	// using `proto.Marshal()`.
	ProtoMarshaller = func(m events.Event) ([]byte, error) {
		if pb, ok := m.(proto.Message); ok {
			return proto.Marshal(pb)
		}

		return nil, fmt.Errorf("could not marshal (proto) message of type `%T`, not a proto message", m)
	}

	// AnyPBMarshaller assumes the input message as a `proto.Message`, and
	// marshall using `anypb` package. That is, it wraps the original proto
	// message in an `Any` message, and then marshall the `Any` message.
	AnyPBMarshaller = func(m events.Event) ([]byte, error) {
		if pb, ok := m.(proto.Message); ok {
			anyMsg, err := anypb.New(pb)
			if err != nil {
				return nil, fmt.Errorf("%w: could not marshal (proto+anypb) message", err)
			}

			return proto.Marshal(anyMsg)
		}

		return nil, fmt.Errorf("could not marshal (proto+anypb) message of type `%T`, not a proto message", m)
	}

	// ProtoJSONMarshaller similar to "AnyPBMarshaller". Assumes that the message
	// is a `proto.Message` instance, and marshall it using `protojson` package.
	ProtoJSONMarshaller = func(m events.Event) ([]byte, error) {
		if pb, ok := m.(proto.Message); ok {
			return protojson.Marshal(pb)
		}

		return nil, fmt.Errorf("could not marshal (proto+json) message of type `%T`, not a proto message", m)
	}
)

List of commonly used marshallers.

View Source
var DefaultExponentialBackoffConfig = ExponentialBackoffConfig{
	Base:   time.Second,
	Factor: time.Second,
	Max:    20 * time.Second,
}

DefaultExponentialBackoffConfig provides a default configuration for exponential backoff.

View Source
var (
	// ErrSinkClosed is returned if Writer.Write call is issued to a sink that
	// has been closed. If encountered, the error should be considered terminal
	// and retries will not be successful.
	ErrSinkClosed = fmt.Errorf("sink closed")
)

Functions

This section is empty.

Types

type BroadcasterSink

type BroadcasterSink[M any] interface {
	Sink[M]

	// Add adds the sink to the broadcaster.
	// The provided sink must be comparable with equality. Typically, this just
	// works with a regular pointer type.
	Add(sink Sink[M]) error

	// Remove the provided sink.
	Remove(sink Sink[M]) error
}

BroadcasterSink sends messages to multiple, reliable Sinks. The goal of this component is to dispatch messages to configured endpoints. Reliability can be provided by wrapping incoming sinks.

func NewBroadcaster

func NewBroadcaster[M any](wErrHandler WriteErrorFn[M], to ...Sink[M]) BroadcasterSink[M]

NewBroadcaster appends one or more sinks to the list of sinks. The broadcaster behavior will be affected by the properties of the sink. Generally, the sink should accept all messages and deal with reliability on its own. Use of QueueSink and RetryingSink should be used here.

type ChannelSink

type ChannelSink[M any] interface {
	Sink[M]

	// Done returns a channel that will always proceed once the sink is closed.
	Done() <-chan struct{}

	// Wait returns a channel that unblocks when a new message arrives.
	// Must be called in a separate goroutine from the writer.
	Wait() <-chan M
}

ChannelSink defines a sink that can be listened on. The writer and channel listener must operate in separate goroutines.

Consumers should listen on Channel.C until Closed is closed.

func NewChannel

func NewChannel[M any](buffer int) ChannelSink[M]

NewChannel returns a channel. If buffer is zero, the channel is unbuffered.

type ExponentialBackoffConfig

type ExponentialBackoffConfig struct {
	// Base is the minimum bound for backing off after failure.
	Base time.Duration

	// Factor sets the amount of time by which the backoff grows with each
	// failure.
	Factor time.Duration

	// Max is the absolute maxiumum bound for a single backoff.
	Max time.Duration
}

ExponentialBackoffConfig configures backoff parameters.

Note that these parameters operate on the upper bound for choosing a random value. For example, at Base=1s, a random value in [0,1s) will be chosen for the backoff value.

type FilterFn

type FilterFn[M any] func(M) bool

FilterFn defines a function filters out messages. If the function returns true, the message will be passed to the underlying sink. Otherwise, the message will be silently dropped.

type KinesisAPI

type KinesisAPI interface {
	PutRecord(ctx context.Context, params *kinesis.PutRecordInput, optFns ...func(*kinesis.Options)) (*kinesis.PutRecordOutput, error)
}

KinesisAPI represents a Kinesis client for sending messages.

type MapperFn

type MapperFn[M any] func(M) (M, error)

MapperFn defines a function maps an input event into another.

type Marshaller

type Marshaller[M any] func(M) ([]byte, error)

Marshaller converts an input message into a byte stream.

type RetrySinkStrategy

type RetrySinkStrategy[M any] interface {
	// Proceed is called before every message send. If proceed returns a
	// positive, non-zero integer, the retryer will back off by the provided
	// duration.
	//
	// A message is provided, by may be ignored.
	Proceed(M) time.Duration

	// Failure reports a failure to the strategy. If this method returns true,
	// the message should be dropped.
	Failure(M, error) bool

	// Success should be called when a message is sent successfully.
	Success(M)
}

RetrySinkStrategy defines a strategy for retrying message sink writes.

All methods should be goroutine safe.

func NewBreakerStrategy

func NewBreakerStrategy[M any](threshold int, backoff time.Duration) RetrySinkStrategy[M]

NewBreakerStrategy returns a breaker that will backoff after the threshold has been tripped. A Breaker is thread safe and may be shared by many goroutines.

func NewExponentialBackoff

func NewExponentialBackoff[M any](config ExponentialBackoffConfig) RetrySinkStrategy[M]

NewExponentialBackoff returns an exponential backoff strategy with the desired config. If config is nil, the default is returned.

type Sink

type Sink[M any] interface {
	Writer[M]
	io.Closer
}

Sink accepts and sends messages. A sink once closed, will not accept any more messages.

func NewFilter

func NewFilter[M any](dst Sink[M], matcher FilterFn[M]) Sink[M]

NewFilter returns a new filter that will send to messages to dst that return true for FilterFn.

func NewIOWriter

func NewIOWriter[M any](iow io.Writer, marshaller Marshaller[M]) Sink[M]

NewIOWriter builds a sink that writes messages into the provided io.Writer.

func NewKinesisSink

func NewKinesisSink[M any](
	streamName string,
	api KinesisAPI,
	marshaller Marshaller[M],
	timeout time.Duration,
	onError WriteErrorFn[M],
) (Sink[M], error)

NewKinesisSink builds a new sink that sends messages to a Kinesis Stream.

func NewMapper

func NewMapper[M any](dst Sink[M], mapper MapperFn[M]) Sink[M]

NewMapper builds sink that passes to dst mapped messages.

func NewNop

func NewNop[M any]() Sink[M]

NewNop builds a sink that does nothing.

func NewQueue

func NewQueue[M any](dst Sink[M], throughput int, dropHandling WriteErrorFn[M]) Sink[M]

NewQueue returns a queue Sink with a given throughput to the provided Sink dst. nil dropHandling will set a noop handler.

func NewRetrying

func NewRetrying[M any](sink Sink[M], strategy RetrySinkStrategy[M], dropHandling WriteErrorFn[M]) Sink[M]

NewRetrying returns a sink that will retry writes to a sink, backing off on failure. Parameters threshold and backoff adjust the behavior of the circuit breaker.

type WriteErrorFn

type WriteErrorFn[M any] func(M, error)

WriteErrorFn defines a function that is invoked each time a message fails to be written to the underlying sink.

type Writer

type Writer[M any] interface {
	// Write writes a message. If no error is returned, the caller can assume
	// that the message have been committed. If an error is received, the caller
	// may retry sending the message.
	Write(M) error
}

Writer defines a component where messages can be written to.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL