base

package
v0.0.0-...-8d90134 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 2, 2021 License: MIT Imports: 7 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func EventStreamSampleGroup

func EventStreamSampleGroup(factory func() EventStream)

Types

type Bracket

type Bracket struct {
	NextSequence int64
	LastSequence int64
}

func All

func All() Bracket

func From

func From(next int64) Bracket

func Range

func Range(next, last int64) Bracket

type Event

type Event struct {
	Sequence   int64                  `json:"sequence,omitempty" yaml:"sequence,omitempty"`
	Aggregate  []string               `json:"aggregate,omitempty" yaml:"aggregate,omitempty"`
	Type       string                 `json:"type,omitempty" yaml:"type,omitempty"`
	OccurredAt time.Time              `json:"occurred_at,omitempty" yaml:"occurred_at,omitempty"`
	Payload    map[string]interface{} `json:"payload,omitempty" yaml:"payload,omitempty"`
}

type EventBuilder

type EventBuilder = func(e *Event)

type EventHandler

type EventHandler func(e *Event) error

type EventStream

type EventStream interface {
	Store(event *Event) (int64, error)
	LastSequence() int64
	Get(sequence int64) (*Event, error)
	Stream(ctx context.Context, sel Selector, bracket Bracket, handler EventHandler) error
	Subscribe(ctx context.Context, persistentClientID string, sel Selector, handler EventHandler) (Subscription, error)
	// Returns all currently known Subscriptions.
	Subscriptions() []Subscription
}

type EventStreamWrapper

type EventStreamWrapper struct {
	// contains filtered or unexported fields
}

func NewWrapper

func NewWrapper(stream EventStream) *EventStreamWrapper

func NewWrapperWithStartTime

func NewWrapperWithStartTime(stream EventStream, startTime time.Time) *EventStreamWrapper

func (*EventStreamWrapper) After

func (s *EventStreamWrapper) After(duration time.Duration) EventBuilder

func (*EventStreamWrapper) Agg

func (s *EventStreamWrapper) Agg(a ...string) EventBuilder

func (*EventStreamWrapper) DefAgg

func (s *EventStreamWrapper) DefAgg() EventBuilder

func (*EventStreamWrapper) Emit

func (s *EventStreamWrapper) Emit(builders ...EventBuilder) (*Event, error)

func (*EventStreamWrapper) IncrBy

func (s *EventStreamWrapper) IncrBy(duration time.Duration) EventBuilder

func (*EventStreamWrapper) Stream

func (s *EventStreamWrapper) Stream() EventStream

func (*EventStreamWrapper) Type

type SelectOption

type SelectOption func(s *Selector)

func SelectAggregate

func SelectAggregate(agg ...string) SelectOption

func SelectType

func SelectType(t string) SelectOption

type Selector

type Selector struct {
	Aggregate []string
	Type      string
}

func ParseSelector

func ParseSelector(s string) (*Selector, error)

func Select

func Select(options ...SelectOption) Selector

func (*Selector) IsComplete

func (s *Selector) IsComplete() bool

func (*Selector) Matches

func (s *Selector) Matches(event *Event) bool

type SequenceStore

type SequenceStore interface {
	Get(persistentClientID string) (int64, error)
	Store(persistentClientID string, sequence int64) error
}

type Subscription

type Subscription interface {
	PersistentID() string
	// Returns the currently active Selector.
	ActiveSelector() Selector
	LastAcknowledgedSequence() (int64, error)
	Acknowledge(sequence int64) error
	// Returns whether this Subscription is currently active.
	Active() bool
	// Returns the time this Subscription last became inactive.
	InactiveSince() time.Time
	// Wait for the Subscription to become inactive (disconnected)
	Wait() error
	// Returns how often this Subscription has dropped out of the live stream.
	DropOuts() int
	// Closes this Subscription and removes all associated state. A Subscription can not be resumed after this call.
	Shutdown()
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL