Documentation ¶
Index ¶
- Constants
- type Config
- type ConstantReconnectionPolicy
- type EventHandler
- type EventHandlerFunc
- type Eventbus
- type ExponentialReconnectionPolicy
- type InMemoryOffsetStore
- type LimitedExponentialReconnectionPolicy
- type LimitedReconnectionPolicy
- type Message
- type PartitionOffsets
- type ReconnectionPolicy
- type ReconnectionScheduler
- type RedisOffsetStore
Constants ¶
const ( OffsetNewest int64 = -1 OffsetOldest int64 = -2 )
These numbers come from https://github.com/Shopify/sarama/blob/master/client.go
const DefaultKeepAliveTimeout = time.Second * 30
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Config ¶
Config records the fields that are use to identify the eventbus client to the eventbus-sub service.
type ConstantReconnectionPolicy ¶
type ConstantReconnectionPolicy struct {
// contains filtered or unexported fields
}
ConstantReconnectionPolicy reconnects every duration forever.
func NewConstantReconnectionPolicy ¶
func NewConstantReconnectionPolicy(t time.Duration) *ConstantReconnectionPolicy
NewConstantReconnectionPolicy creates a new ConstantReconnectionPolicy with the specified duration.
func (ConstantReconnectionPolicy) NewScheduler ¶
func (p ConstantReconnectionPolicy) NewScheduler() ReconnectionScheduler
NewScheduler implements the ReconnectionPolicy interface and returns a new constant reconnection scheduler.
type EventHandler ¶
An EventHandler responds to an event. If the Handle call returns an error, then the offset will not be recorded as processed.
type EventHandlerFunc ¶
EventHandlerFunc is an adapter type to allow the use of ordinary functions as an EventHandler
func (EventHandlerFunc) Handle ¶
func (e EventHandlerFunc) Handle(m Message) error
Handle implements EventHandler for the EventHandlerFunc adapter type.
type Eventbus ¶
type Eventbus struct { Reconnection ReconnectionScheduler KeepAliveTimeout time.Duration // contains filtered or unexported fields }
An Eventbus is the client for connecting to eventbus-sub.
func NewEventbus ¶
func NewEventbus(config Config, handler EventHandler, store offsetStore) *Eventbus
NewEventbus creates a new Eventbus client to handle events.
func (*Eventbus) Run ¶
Run starts the eventbus loop. When Run is called, the registered EventHandler will be called for each message in the stream. It returns a chan that the caller can wait on to receive errors during event streaming.
func (*Eventbus) SetErrorLogger ¶
SetErrorLogger allows configuration of the error logging mechanism.
func (*Eventbus) StartAtNewest ¶
func (eb *Eventbus) StartAtNewest()
StartAtNewest sets the offset to request from the most recent offsets, rather than from the start of the events recorded in the stream.
type ExponentialReconnectionPolicy ¶
type ExponentialReconnectionPolicy struct {
// contains filtered or unexported fields
}
ExponentialReconnectionPolicy reconnects with an exponential delay, up to maxDelay forever.
func NewExponentialReconnectionPolicy ¶
func NewExponentialReconnectionPolicy(base, max time.Duration) *ExponentialReconnectionPolicy
NewExponentialReconnectionPolicy creates a new ExponentialReconnectionPolicy with the base and max durations.
func (ExponentialReconnectionPolicy) NewScheduler ¶
func (p ExponentialReconnectionPolicy) NewScheduler() ReconnectionScheduler
NewScheduler implements the ReconnectionPolicy interface and returns a new exponential reconnection scheduler.
type InMemoryOffsetStore ¶
type InMemoryOffsetStore struct {
// contains filtered or unexported fields
}
InMemoryOffsetStore is mostly for testing purposes.
func NewInMemoryOffsetStore ¶
func NewInMemoryOffsetStore() *InMemoryOffsetStore
NewInMemoryOffsetStore creates a new InMemoryOffsetStore.
func (InMemoryOffsetStore) GetOffsets ¶
func (os InMemoryOffsetStore) GetOffsets() (*PartitionOffsets, error)
GetOffsets returns either nil, nil if we have no offsets, or the current set of recorded offsets and no error.
type LimitedExponentialReconnectionPolicy ¶
type LimitedExponentialReconnectionPolicy struct {
// contains filtered or unexported fields
}
LimitedExponentialReconnectionPolicy reconnects with an exponential backoff until the backoff is greater than the maximum delay.
func NewLimitedExponentialReconnectionPolicy ¶
func NewLimitedExponentialReconnectionPolicy(base, max time.Duration) *LimitedExponentialReconnectionPolicy
NewLimitedExponentialReconnectionPolicy creates a new LimitedReconnectionPolicy.
func (LimitedExponentialReconnectionPolicy) NewScheduler ¶
func (p LimitedExponentialReconnectionPolicy) NewScheduler() ReconnectionScheduler
NewScheduler implements the ReconnectionPolicy interface and returns a new limited exponential reconnection scheduler.
type LimitedReconnectionPolicy ¶
type LimitedReconnectionPolicy struct {
// contains filtered or unexported fields
}
LimitedReconnectionPolicy reconnects with an fixed delay for a limited number of attempts, and then returns ErrReconnectsExhausted.
func NewLimitedReconnectionPolicy ¶
func NewLimitedReconnectionPolicy(attempts int32, t time.Duration) *LimitedReconnectionPolicy
NewLimitedReconnectionPolicy creates a new LimitedReconnectionPolicy.
func (LimitedReconnectionPolicy) NewScheduler ¶
func (p LimitedReconnectionPolicy) NewScheduler() ReconnectionScheduler
NewScheduler implements the ReconnectionPolicy interface and returns a new limited reconnection scheduler.
type Message ¶
type Message struct { Offset int64 `json:"offset"` Partition int32 `json:"partition"` Body json.RawMessage `json:"body"` }
type PartitionOffsets ¶
PartitionOffsets represents the offsets for each partition.
func (PartitionOffsets) MarshalJSON ¶
func (po PartitionOffsets) MarshalJSON() ([]byte, error)
MarshalJSON formats the numeric data as strings because eventbus-sub expects it.
type ReconnectionPolicy ¶
type ReconnectionPolicy interface {
NewScheduler() ReconnectionScheduler
}
ReconnectionPolicy returns a ReconnectionScheduler to be used when attempting to reconnect.
var ( // DefaultPolicy is used by the Eventbus constructor. DefaultPolicy ReconnectionPolicy = NewExponentialReconnectionPolicy(1*time.Second, 32*time.Second) // ErrReconnectsExhausted is returned as an error when the reconnection policy // has run out of attempts. ErrReconnectsExhausted = errors.New("reconnects exhausted") )
type ReconnectionScheduler ¶
type ReconnectionScheduler interface { // Should return an error to indicate that the client should not reconnect NextReconnectBackoff() (time.Duration, error) }
ReconnectionScheduler should return the time before the next reconnection should be made.
type RedisOffsetStore ¶
type RedisOffsetStore struct {
// contains filtered or unexported fields
}
RedisOffsetStore uses a connection pool to record the offsets and partitions.
func NewRedisOffsetStore ¶
func NewRedisOffsetStore(prefix string, p *redis.Pool) *RedisOffsetStore
NewRedisOffsetStore creates a new RedisOffsetStore.
func (RedisOffsetStore) GetOffsets ¶
func (rs RedisOffsetStore) GetOffsets() (*PartitionOffsets, error)
GetOffsets returns the current offsets stored in Redis and possibly an error.