stream

package
v0.0.0-...-da8027b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 4, 2020 License: MIT Imports: 4 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ErrorReplyType   = ReplyType("Error")
	SuccessReplyType = ReplyType("Success")
	UnknownReplyType = ReplyType("Unknown")
)
View Source
const (
	EventTypeIDKey   = "_eventType"
	StreamTypeIDKey  = "_streamType"
	CorrelationIDKey = "_correlationID"
	ReplyToKey       = "_replyTo"
)

Variables

This section is empty.

Functions

func Emit

func Emit(es Stream, et EventType, payload interface{})

Emit stores new events on the stream and then applies them to the stream with default context.

func EmitWithContext

func EmitWithContext(es Stream, et EventType, payload interface{}, eventContext Context)

EmitWithContext stores new events on the stream and then applies them to the stream.

Types

type Command

type Command interface {

	// StreamID returns the id of the stream that the command relates to.
	StreamID() ID

	// StreamType returns the type of the stream that the command relates to.
	StreamType() Type

	// Type returns the command type.
	Type() CommandType

	// Payload returns the command payload.
	Payload() interface{}

	// Header returns the command headers.
	Header() Header

	// Reply constructs a new Reply with the payload and version.
	Reply(reply interface{}, v Version) *Reply

	// ReplyOkWithVersion constructs a new success Reply with the specified version.
	ReplyOkWithVersion(v Version) *Reply

	// ReplyWithError constructs a new error Reply with the specified error.
	ReplyWithError(err error) *Reply

	// ReplyOk constructs a new success Reply without the specified version.
	ReplyOk() *Reply

	// ReplyWith constructs a new Reply.
	ReplyWith(rt ReplyType, reply interface{}, v Version) *Reply
}

Command represents the intention to change the state of the stream.

func NewCommand

func NewCommand(st Type, sid ID, ct CommandType, payload interface{}) Command

NewCommand constructs a new Command.

type CommandType

type CommandType string

func (CommandType) String

func (t CommandType) String() string

type Context

type Context map[string]string

Context is the meta information of the event.

type CorrelationID

type CorrelationID string

func NewCorrelationID

func NewCorrelationID() CorrelationID

func (CorrelationID) Equal

func (cid CorrelationID) Equal(correlationID string) bool

func (CorrelationID) String

func (cid CorrelationID) String() string

type Event

type Event interface {

	// StreamID returns the id of the stream that the event relates to.
	StreamID() ID

	// SetStreamID sets the stream id.
	SetStreamID(id ID)

	// StreamType returns the type of the stream that the event relates to.
	StreamType() Type

	// Version returns the version of the event.
	Version() Version

	// Context returns the event context.
	Context() Context

	// Payload returns the event payload.
	Payload() interface{}

	// Type returns the event type.
	Type() EventType

	// CreatedAt returns the time when the event was created.
	CreatedAt() time.Time

	// Generation returns the payload semantic version.
	// Default 0.
	Generation() int

	// SetGeneration sets the semantic version for the payload.
	SetGeneration(int)
}

Event represents something that took place in the domain. They are always named with a past-participle verb.

func NewEvent

func NewEvent(st Type, sid ID, et EventType, v Version, payload interface{}) Event

NewEvent creates a new Event with the specified params.

func NewEventWithContext

func NewEventWithContext(streamID ID, st Type, v Version, payload interface{}, et EventType, ec Context, t time.Time) Event

NewEventWithContext creates a new Event with the specified params.

type EventType

type EventType string

func (EventType) String

func (t EventType) String() string
type Header map[string]string

func (Header) Add

func (h Header) Add(k, v string)

func (Header) CorrelationID

func (h Header) CorrelationID() CorrelationID

func (Header) Get

func (h Header) Get(k string) string

func (Header) ReplyTo

func (h Header) ReplyTo() string

func (Header) SetCorrelationID

func (h Header) SetCorrelationID(cid CorrelationID)

func (Header) SetReplyTo

func (h Header) SetReplyTo(reply string)

func (Header) ShouldReply

func (h Header) ShouldReply() bool

type ID

type ID uuid.UUID

func IDFromString

func IDFromString(id string) (ID, error)

func NewID

func NewID() ID

func (ID) CorrelationID

func (t ID) CorrelationID() CorrelationID

func (ID) Empty

func (t ID) Empty() bool

func (ID) String

func (t ID) String() string

func (ID) Validate

func (t ID) Validate() error

type Identifier

type Identifier interface {
	ID() ID
	SetID(ID)
}

type Reply

type Reply struct {
	// contains filtered or unexported fields
}

Reply represents the reply to the command.

func NewReply

func NewReply(cid CorrelationID, sid ID, st Type, rt ReplyType, ct CommandType, v Version, p interface{}, err error) *Reply

func (*Reply) CommandType

func (r *Reply) CommandType() CommandType

func (*Reply) Copy

func (r *Reply) Copy() *Reply

func (*Reply) CorrelationID

func (r *Reply) CorrelationID() CorrelationID

func (*Reply) Err

func (r *Reply) Err() error

func (*Reply) IsErrorType

func (r *Reply) IsErrorType() bool

func (*Reply) IsSuccessType

func (r *Reply) IsSuccessType() bool

func (*Reply) Payload

func (r *Reply) Payload() interface{}

func (*Reply) StreamID

func (r *Reply) StreamID() ID

func (*Reply) StreamType

func (r *Reply) StreamType() Type

func (*Reply) Type

func (r *Reply) Type() ReplyType

func (*Reply) Version

func (r *Reply) Version() Version

type ReplyType

type ReplyType string

func (ReplyType) String

func (t ReplyType) String() string

type Stream

type Stream interface {

	// ID returns the ID.
	ID() ID

	// SetID sets the ID after rehydrate from store.
	SetID(ID)

	// Type returns the stream type.
	Type() Type

	// Version returns the version of the stream.
	Version() Version

	// PreviousVersion returns the version of the stream with no current changes.
	PreviousVersion() Version

	// IncrementVersion increments the stream version number by one.
	IncrementVersion()

	// Changes returns the list of new events.
	Changes() []Event

	// ClearChanges removes all new events from the stream.
	ClearChanges()

	// AddChange stores the new event in the stream.
	AddChange(Event)

	// Apply applies the new event in the stream.
	Apply(Event, bool)
}

A stream represents a current state as a sequence of events. Stream can be viewed as a whole in terms of data modification. All calls to stream must be made through its root, which is an entity with a globally unique identifier stream.ID. All internal objects of the stream have only local identity, they can refer to each other in any way.

func New

func New() Stream

New constructs a new Stream.

type Type

type Type string

func (Type) String

func (t Type) String() string

type Version

type Version int

type Versioner

type Versioner interface {
	Version() Version
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL