eventstream

package
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 26, 2024 License: MIT Imports: 10 Imported by: 0

Documentation

Overview

Package eventstream provides abstractions for consuming ordered streams of event messages.

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrCursorClosed is returned by Cursor.Next() and Close() if the
	// stream is closed.
	ErrCursorClosed = errors.New("stream cursor is closed")

	// ErrTruncated indicates that a cursor can not be opened because the requested
	// offset is on a portion of the event stream that has been truncated.
	ErrTruncated = errors.New("can not open cursor, stream is truncated")
)

Functions

This section is empty.

Types

type Consumer

type Consumer struct {
	// Stream is the event stream to consume.
	Stream Stream

	// EventTypes is the set of event types that the handler consumes.
	EventTypes message.TypeCollection

	// Handler is the target for the events from the stream.
	Handler Handler

	// Semaphore is used to limit the number of messages being handled
	// concurrently.
	Semaphore *semaphore.Weighted

	// BackoffStrategy is the strategy used to delay restarting the consumer
	// after a failure. If it is nil, backoff.DefaultStrategy is used.
	BackoffStrategy backoff.Strategy

	// Logger is the target for log messages from the consumer.
	// If it is nil, logging.DefaultLogger is used.
	Logger logging.Logger
	// contains filtered or unexported fields
}

Consumer reads events from a stream in order to handle them.

func (*Consumer) Run

func (c *Consumer) Run(ctx context.Context) error

Run handles events from the stream until ctx is canceled or no more relevant events will occur.

type Cursor

type Cursor interface {
	// Next returns the next event in the stream that matches the filter.
	//
	// If the end of the stream is reached it blocks until a relevant event is
	// appended to the stream or ctx is canceled.
	//
	// If the stream is closed before or during a call to Next(), it returns
	// ErrCursorClosed.
	//
	// It returns ErrTruncated if the next event can not be obtained because it
	// occupies a portion of the stream that has been truncated.
	Next(ctx context.Context) (Event, error)

	// Close discards the cursor.
	//
	// It returns ErrCursorClosed if the cursor is already closed.
	// Any current or future calls to Next() return ErrCursorClosed.
	Close() error
}

A Cursor reads events from a stream.

Cursors are not safe for concurrent use.

type Event

type Event struct {
	// Offset is the 0-based index of the event on the stream.
	Offset uint64

	// Parcel contains the event from the stream.
	Parcel parcel.Parcel
}

Event is a container for an envelope and event stream specific meta-data.

func (Event) ID

func (e Event) ID() string

ID returns the ID of the message.

type Handler

type Handler interface {
	// NextOffset returns the offset of the next event to be consumed from a
	// specific application's event stream.
	//
	// id is the identity of the source application.
	NextOffset(ctx context.Context, id configkit.Identity) (uint64, error)

	// HandleEvent handles an event obtained from the event stream.
	//
	// o must be the offset that would be returned by NextOffset(). On success,
	// the next call to NextOffset() will return ev.Offset + 1.
	HandleEvent(ctx context.Context, o uint64, ev Event) error
}

Handler handles events consumed from a stream.

type Stream

type Stream interface {
	// Application returns the identity of the application that owns the stream.
	Application() configkit.Identity

	// EventTypes returns the set of event types that may appear on the stream.
	EventTypes(ctx context.Context) (message.TypeCollection, error)

	// Open returns a cursor that reads events from the stream.
	//
	// o is the offset of the first event to read. The first event on a stream
	// is always at offset 0.
	//
	// f is the set of "filter" event types to be returned by Cursor.Next(). Any
	// other event types are ignored.
	//
	// It returns an error if any of the event types in f are not supported, as
	// indicated by EventTypes().
	Open(ctx context.Context, o uint64, f message.TypeCollection) (Cursor, error)
}

A Stream is an ordered sequence of event messages.

Directories

Path Synopsis
internal
streamtest
Package streamtest contains a common test suite for eventstream.Stream implementations.
Package streamtest contains a common test suite for eventstream.Stream implementations.
Package memorystream provides an in-memory implementation of eventstream.Stream.
Package memorystream provides an in-memory implementation of eventstream.Stream.
Package networkstream presents a persistence.EventRepository as an eventstream.Stream via the dogma.messaging.v1 EventStream service.
Package networkstream presents a persistence.EventRepository as an eventstream.Stream via the dogma.messaging.v1 EventStream service.
Package persistedstream provides an implementation of eventstream.Stream that streams persisted events from an event repository.
Package persistedstream provides an implementation of eventstream.Stream that streams persisted events from an event repository.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL