stream

package module
v1.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 28, 2019 License: MIT Imports: 2 Imported by: 1

README

Stream abstraction for Go

Common stream abstraction and in-memory implementation for streaming data.

Importing into a project with go get

go get github.com/artyomturkin/go-stream

Import into a project with go mod support by adding to go.mod file

require github.com/artyomturkin/go-stream v1.1.2

Abstractions

Stream

Common structure and configuration for creating streams from providers, consumer and producer interfaces and message structure.

// Stream used to build a Consumer object
type Stream interface {
	GetConsumer(ctx context.Context, group string) Consumer
	GetProducer(ctx context.Context, group string) Producer
}
// Message wraps context and data from stream
type Message struct {
	Context context.Context
	Data    interface{}
}
// Consumer provides read access to a message stream
type Consumer interface {
	Messages() <-chan Message
	Ack(context.Context) error
	Nack(context.Context) error
	Close() error
	Errors() <-chan error
	Done() <-chan struct{}
}
// Producer provides publish access to a message stream
type Producer interface {
	Publish(context.Context, interface{}) error
	Close() error
	Errors() <-chan error
	Done() <-chan struct{}
}
// Config common configuration for streams
type Config struct {
	Endpoints           []string
	Topic               string
	MaxInflightMessages int
	Custom              interface{}
}
Helper funcs

Functions to set and get tracking information from context

// SetTrackers adds message trackers to context
func SetTrackers(ctx context.Context, tracker ...interface{}) context.Context
// GetTrackers returns an array of trackers
func GetTrackers(ctx context.Context) []interface{}

Documentation

Index

Constants

View Source
const TrackedMessagesContextKey = streamContextKey("TRACKED_MESSAGES")

TrackedMessagesContextKey context key to get tracked messages from context. Tracked messages are stored in []interface{}

Variables

This section is empty.

Functions

func GetTrackers added in v1.0.0

func GetTrackers(ctx context.Context) []interface{}

GetTrackers returns an array of trackers

func SetTrackers added in v1.0.0

func SetTrackers(ctx context.Context, tracker ...interface{}) context.Context

SetTrackers adds message trackers to context

Types

type Config

type Config struct {
	Endpoints           []string
	Topic               string
	MaxInflightMessages int

	Custom interface{}
}

Config common configuration for streams

type Consumer

type Consumer interface {
	Messages() <-chan Message
	Ack(context.Context) error
	Nack(context.Context) error
	Close() error
	Errors() <-chan error
	Done() <-chan struct{}
}

Consumer provides read access to a message stream

type InmemStream

type InmemStream struct {
	Messages []interface{}

	Acks  []context.Context
	Nacks []context.Context
}

InmemStream create inmemory stream. Created consumer will output elements defined in Message, Acks and Nacks will be stored in Acks and Nacks fields. Created producer will store published messages in Message.

func (*InmemStream) GetConsumer

func (i *InmemStream) GetConsumer(ctx context.Context, _ string) Consumer

GetConsumer create new consumer from inmemory stream

func (*InmemStream) GetProducer

func (i *InmemStream) GetProducer(ctx context.Context, _ string) Producer

GetProducer create new producer from inmemory stream

type Message

type Message struct {
	Context context.Context
	Data    interface{}
}

Message wraps context and data from stream

type Producer

type Producer interface {
	Publish(context.Context, interface{}) error
	Close() error
	Errors() <-chan error
	Done() <-chan struct{}
}

Producer provides publish access to a message stream

type Stream

type Stream interface {
	GetConsumer(ctx context.Context, group string) Consumer
	GetProducer(ctx context.Context, group string) Producer
}

Stream used to build a Consumer object

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL