eventplayer

package
v0.0.0-...-da8027b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 4, 2020 License: MIT Imports: 14 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ReplayAllStreamsCommand        = stream.CommandType("ReplayAllStreams")
	ReplayStreamCommand            = stream.CommandType("ReplayPlayerStream")
	ReplayStreamFromVersionCommand = stream.CommandType("ReplayStreamFromVersion")
)

Variables

View Source
var (
	ErrUnknownCommandType   = errors.New("unknown command type")
	ErrStateNoFound         = errors.New("state not found")
	ErrVersionInconsistency = errors.New("err version inconsistency")
)

Functions

func ChannelBuilder

func ChannelBuilder(st stream.Type) string

func NewCodec

func NewCodec(st stream.Type) codec.Codec

func NewReplayAllStreamsCommand

func NewReplayAllStreamsCommand(st stream.Type) stream.Command

func NewReplayStreamCommand

func NewReplayStreamCommand(st stream.Type, sid stream.ID) stream.Command

func NewReplayStreamFromVersionCommand

func NewReplayStreamFromVersionCommand(st stream.Type, sid stream.ID, from stream.Version) stream.Command

func SID

func SID(st stream.Type, sid stream.ID, ct stream.CommandType) string

Types

type Bus

type Bus struct {
	// contains filtered or unexported fields
}

func NewBus

func NewBus(s Sender, r Receiver) *Bus

func (*Bus) Close

func (b *Bus) Close() (err error)

func (*Bus) Receive

func (b *Bus) Receive(ctx context.Context, st stream.Type, h Handler) error

func (*Bus) ReplayStream

func (b *Bus) ReplayStream(ctx context.Context, sid stream.ID, st stream.Type) error

func (*Bus) ReplayStreamFromVersion

func (b *Bus) ReplayStreamFromVersion(ctx context.Context, sid stream.ID, st stream.Type, from stream.Version) error

func (*Bus) ReplayStreams

func (b *Bus) ReplayStreams(ctx context.Context, st stream.Type) error

func (*Bus) Send

func (b *Bus) Send(ctx context.Context, c stream.Command) error

type Controller

type Controller struct {
	// contains filtered or unexported fields
}

func NewController

func NewController(r *EventPlayer, s Store, o ...Option) *Controller

func (*Controller) Receive

func (c *Controller) Receive(ctx context.Context, cmd stream.Command) (err error)

type EventPlayer

type EventPlayer struct {
	// contains filtered or unexported fields
}

func New

func New(
	eventStore eventstore.Store,
	publisher eventbus.Publisher,
) *EventPlayer

func (*EventPlayer) ReplayStream

func (r *EventPlayer) ReplayStream(ctx context.Context, sid stream.ID, st stream.Type, withDelay time.Duration) error

func (*EventPlayer) ReplayStreamFrom

func (r *EventPlayer) ReplayStreamFrom(ctx context.Context, sid stream.ID, st stream.Type, fromEvents stream.Version, withDelay time.Duration) error

func (*EventPlayer) ReplayStreams

func (r *EventPlayer) ReplayStreams(ctx context.Context, st stream.Type, withDelay time.Duration) error

type Handler

type Handler interface {
	Receive(context.Context, stream.Command) error
}

type InMemStore

type InMemStore struct {
	// contains filtered or unexported fields
}

func NewInMemStore

func NewInMemStore() *InMemStore

func (*InMemStore) Load

func (s *InMemStore) Load(_ context.Context, st stream.Type, sid stream.ID, ct stream.CommandType) (*State, error)

func (*InMemStore) Remove

func (s *InMemStore) Remove(_ context.Context, state *State) error

func (*InMemStore) Save

func (s *InMemStore) Save(_ context.Context, state *State, v Version) error

type Option

type Option func(*Controller)

func SetDelay

func SetDelay(dur time.Duration) Option

type Receiver

type Receiver interface {
	Receive(context.Context, stream.Type, Handler) error
	Close() error
}

func NewReceiver

func NewReceiver(
	r messaging.Receiver,
	e *commandbus.Envelope,
	o ...ReceiverOption,
) Receiver

type ReceiverOption

type ReceiverOption func(*receiver)

func ReceiverErrorHandler

func ReceiverErrorHandler(h messaging.ErrorHandler) ReceiverOption

type Sender

type Sender interface {
	Send(context.Context, stream.Command) error
	Close() error
}

func NewSender

func NewSender(s messaging.Sender, e *commandbus.Envelope, o ...SenderOption) Sender

type SenderOption

type SenderOption func(*sender)

func SenderChannelBuilder

func SenderChannelBuilder(cb messaging.ChannelBuilderFunc) SenderOption

type State

type State struct {
	// contains filtered or unexported fields
}

func NewState

func NewState(st stream.Type, sid stream.ID, ct stream.CommandType) *State

func (*State) ID

func (s *State) ID() string

func (*State) IsProcessing

func (s *State) IsProcessing() bool

func (*State) MarshalJSON

func (s *State) MarshalJSON() ([]byte, error)

func (*State) Processing

func (s *State) Processing()

func (*State) UnmarshalJSON

func (s *State) UnmarshalJSON(b []byte) error

func (*State) Version

func (s *State) Version() Version

type Status

type Status int
const (
	Unknown    Status = 0
	Processing Status = 2
)

func (Status) String

func (s Status) String() string

type Store

type Store interface {
	Load(context.Context, stream.Type, stream.ID, stream.CommandType) (*State, error)
	Remove(context.Context, *State) error
	Save(context.Context, *State, Version) error
}

type Version

type Version int

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL