Documentation ¶
Index ¶
- func EventStreamSampleGroup(factory func() EventStream)
- type Bracket
- type Event
- type EventBuilder
- type EventHandler
- type EventStream
- type EventStreamWrapper
- func (s *EventStreamWrapper) After(duration time.Duration) EventBuilder
- func (s *EventStreamWrapper) Agg(a ...string) EventBuilder
- func (s *EventStreamWrapper) DefAgg() EventBuilder
- func (s *EventStreamWrapper) Emit(builders ...EventBuilder) (*Event, error)
- func (s *EventStreamWrapper) IncrBy(duration time.Duration) EventBuilder
- func (s *EventStreamWrapper) Stream() EventStream
- func (s *EventStreamWrapper) Type(t string) EventBuilder
- type SelectOption
- type Selector
- type SequenceStore
- type Subscription
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func EventStreamSampleGroup ¶
func EventStreamSampleGroup(factory func() EventStream)
Types ¶
type Event ¶
type Event struct { Sequence int64 `json:"sequence,omitempty" yaml:"sequence,omitempty"` Aggregate []string `json:"aggregate,omitempty" yaml:"aggregate,omitempty"` Type string `json:"type,omitempty" yaml:"type,omitempty"` OccurredAt time.Time `json:"occurred_at,omitempty" yaml:"occurred_at,omitempty"` Payload map[string]interface{} `json:"payload,omitempty" yaml:"payload,omitempty"` }
type EventBuilder ¶
type EventBuilder = func(e *Event)
type EventHandler ¶
type EventStream ¶
type EventStream interface { Store(event *Event) (int64, error) LastSequence() int64 Get(sequence int64) (*Event, error) Stream(ctx context.Context, sel Selector, bracket Bracket, handler EventHandler) error Subscribe(ctx context.Context, persistentClientID string, sel Selector, handler EventHandler) (Subscription, error) // Returns all currently known Subscriptions. Subscriptions() []Subscription }
type EventStreamWrapper ¶
type EventStreamWrapper struct {
// contains filtered or unexported fields
}
func NewWrapper ¶
func NewWrapper(stream EventStream) *EventStreamWrapper
func NewWrapperWithStartTime ¶
func NewWrapperWithStartTime(stream EventStream, startTime time.Time) *EventStreamWrapper
func (*EventStreamWrapper) After ¶
func (s *EventStreamWrapper) After(duration time.Duration) EventBuilder
func (*EventStreamWrapper) Agg ¶
func (s *EventStreamWrapper) Agg(a ...string) EventBuilder
func (*EventStreamWrapper) DefAgg ¶
func (s *EventStreamWrapper) DefAgg() EventBuilder
func (*EventStreamWrapper) Emit ¶
func (s *EventStreamWrapper) Emit(builders ...EventBuilder) (*Event, error)
func (*EventStreamWrapper) IncrBy ¶
func (s *EventStreamWrapper) IncrBy(duration time.Duration) EventBuilder
func (*EventStreamWrapper) Stream ¶
func (s *EventStreamWrapper) Stream() EventStream
func (*EventStreamWrapper) Type ¶
func (s *EventStreamWrapper) Type(t string) EventBuilder
type SelectOption ¶
type SelectOption func(s *Selector)
func SelectAggregate ¶
func SelectAggregate(agg ...string) SelectOption
func SelectType ¶
func SelectType(t string) SelectOption
type Selector ¶
func ParseSelector ¶
func Select ¶
func Select(options ...SelectOption) Selector
func (*Selector) IsComplete ¶
type SequenceStore ¶
type Subscription ¶
type Subscription interface { PersistentID() string // Returns the currently active Selector. ActiveSelector() Selector LastAcknowledgedSequence() (int64, error) Acknowledge(sequence int64) error // Returns whether this Subscription is currently active. Active() bool // Returns the time this Subscription last became inactive. InactiveSince() time.Time // Wait for the Subscription to become inactive (disconnected) Wait() error // Returns how often this Subscription has dropped out of the live stream. DropOuts() int // Closes this Subscription and removes all associated state. A Subscription can not be resumed after this call. Shutdown() }
Click to show internal directories.
Click to hide internal directories.