eventbus

package module
v0.0.0-...-f75852d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 21, 2019 License: Apache-2.0 Imports: 12 Imported by: 0

README

go-eventbus

EventBus client in go

Documentation

Index

Constants

View Source
const (
	OffsetNewest int64 = -1
	OffsetOldest int64 = -2
)

These numbers come from https://github.com/Shopify/sarama/blob/master/client.go

View Source
const DefaultKeepAliveTimeout = time.Second * 30

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	Endpoint  string
	AuthToken string
	Stream    string
	Client    string
	Version   string
}

Config records the fields that are use to identify the eventbus client to the eventbus-sub service.

type ConstantReconnectionPolicy

type ConstantReconnectionPolicy struct {
	// contains filtered or unexported fields
}

ConstantReconnectionPolicy reconnects every duration forever.

func NewConstantReconnectionPolicy

func NewConstantReconnectionPolicy(t time.Duration) *ConstantReconnectionPolicy

NewConstantReconnectionPolicy creates a new ConstantReconnectionPolicy with the specified duration.

func (ConstantReconnectionPolicy) NewScheduler

NewScheduler implements the ReconnectionPolicy interface and returns a new constant reconnection scheduler.

type EventHandler

type EventHandler interface {
	Handle(Message) error
}

An EventHandler responds to an event. If the Handle call returns an error, then the offset will not be recorded as processed.

type EventHandlerFunc

type EventHandlerFunc func(Message) error

EventHandlerFunc is an adapter type to allow the use of ordinary functions as an EventHandler

func (EventHandlerFunc) Handle

func (e EventHandlerFunc) Handle(m Message) error

Handle implements EventHandler for the EventHandlerFunc adapter type.

type Eventbus

type Eventbus struct {
	Reconnection ReconnectionScheduler

	KeepAliveTimeout time.Duration
	// contains filtered or unexported fields
}

An Eventbus is the client for connecting to eventbus-sub.

func NewEventbus

func NewEventbus(config Config, handler EventHandler, store offsetStore) *Eventbus

NewEventbus creates a new Eventbus client to handle events.

func (*Eventbus) Run

func (eb *Eventbus) Run() chan error

Run starts the eventbus loop. When Run is called, the registered EventHandler will be called for each message in the stream. It returns a chan that the caller can wait on to receive errors during event streaming.

func (*Eventbus) SetErrorLogger

func (eb *Eventbus) SetErrorLogger(el func(e error))

SetErrorLogger allows configuration of the error logging mechanism.

func (*Eventbus) StartAtNewest

func (eb *Eventbus) StartAtNewest()

StartAtNewest sets the offset to request from the most recent offsets, rather than from the start of the events recorded in the stream.

type ExponentialReconnectionPolicy

type ExponentialReconnectionPolicy struct {
	// contains filtered or unexported fields
}

ExponentialReconnectionPolicy reconnects with an exponential delay, up to maxDelay forever.

func NewExponentialReconnectionPolicy

func NewExponentialReconnectionPolicy(base, max time.Duration) *ExponentialReconnectionPolicy

NewExponentialReconnectionPolicy creates a new ExponentialReconnectionPolicy with the base and max durations.

func (ExponentialReconnectionPolicy) NewScheduler

NewScheduler implements the ReconnectionPolicy interface and returns a new exponential reconnection scheduler.

type InMemoryOffsetStore

type InMemoryOffsetStore struct {
	// contains filtered or unexported fields
}

InMemoryOffsetStore is mostly for testing purposes.

func NewInMemoryOffsetStore

func NewInMemoryOffsetStore() *InMemoryOffsetStore

NewInMemoryOffsetStore creates a new InMemoryOffsetStore.

func (InMemoryOffsetStore) GetOffsets

func (os InMemoryOffsetStore) GetOffsets() (*PartitionOffsets, error)

GetOffsets returns either nil, nil if we have no offsets, or the current set of recorded offsets and no error.

func (*InMemoryOffsetStore) SetOffset

func (os *InMemoryOffsetStore) SetOffset(partition int32, offset int64) error

SetOffset stores the offset against the partition and always returns a nil error.

type LimitedExponentialReconnectionPolicy

type LimitedExponentialReconnectionPolicy struct {
	// contains filtered or unexported fields
}

LimitedExponentialReconnectionPolicy reconnects with an exponential backoff until the backoff is greater than the maximum delay.

func NewLimitedExponentialReconnectionPolicy

func NewLimitedExponentialReconnectionPolicy(base, max time.Duration) *LimitedExponentialReconnectionPolicy

NewLimitedExponentialReconnectionPolicy creates a new LimitedReconnectionPolicy.

func (LimitedExponentialReconnectionPolicy) NewScheduler

NewScheduler implements the ReconnectionPolicy interface and returns a new limited exponential reconnection scheduler.

type LimitedReconnectionPolicy

type LimitedReconnectionPolicy struct {
	// contains filtered or unexported fields
}

LimitedReconnectionPolicy reconnects with an fixed delay for a limited number of attempts, and then returns ErrReconnectsExhausted.

func NewLimitedReconnectionPolicy

func NewLimitedReconnectionPolicy(attempts int32, t time.Duration) *LimitedReconnectionPolicy

NewLimitedReconnectionPolicy creates a new LimitedReconnectionPolicy.

func (LimitedReconnectionPolicy) NewScheduler

NewScheduler implements the ReconnectionPolicy interface and returns a new limited reconnection scheduler.

type Message

type Message struct {
	Offset    int64           `json:"offset"`
	Partition int32           `json:"partition"`
	Body      json.RawMessage `json:"body"`
}

type PartitionOffsets

type PartitionOffsets map[int32]int64

PartitionOffsets represents the offsets for each partition.

func (PartitionOffsets) MarshalJSON

func (po PartitionOffsets) MarshalJSON() ([]byte, error)

MarshalJSON formats the numeric data as strings because eventbus-sub expects it.

type ReconnectionPolicy

type ReconnectionPolicy interface {
	NewScheduler() ReconnectionScheduler
}

ReconnectionPolicy returns a ReconnectionScheduler to be used when attempting to reconnect.

var (
	// DefaultPolicy is used by the Eventbus constructor.
	DefaultPolicy ReconnectionPolicy = NewExponentialReconnectionPolicy(1*time.Second, 32*time.Second)
	// ErrReconnectsExhausted is returned as an error when the reconnection policy
	// has run out of attempts.
	ErrReconnectsExhausted = errors.New("reconnects exhausted")
)

type ReconnectionScheduler

type ReconnectionScheduler interface {
	// Should return an error to indicate that the client should not reconnect
	NextReconnectBackoff() (time.Duration, error)
}

ReconnectionScheduler should return the time before the next reconnection should be made.

type RedisOffsetStore

type RedisOffsetStore struct {
	// contains filtered or unexported fields
}

RedisOffsetStore uses a connection pool to record the offsets and partitions.

func NewRedisOffsetStore

func NewRedisOffsetStore(prefix string, p *redis.Pool) *RedisOffsetStore

NewRedisOffsetStore creates a new RedisOffsetStore.

func (RedisOffsetStore) GetOffsets

func (rs RedisOffsetStore) GetOffsets() (*PartitionOffsets, error)

GetOffsets returns the current offsets stored in Redis and possibly an error.

func (RedisOffsetStore) SetOffset

func (rs RedisOffsetStore) SetOffset(partition int32, offset int64) error

SetOffset stores the offset against the partition and returns errors returned from Redis.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL