Documentation ¶
Index ¶
- Constants
- func Subscribe(c chan<- *Event, topics ...Topic)
- func Unsubscribe(c chan<- *Event)
- type Aggregate
- type CacheService
- type Client
- func (c *Client) CacheService() CacheService
- func (c *Client) Dispatch(ctx context.Context, event *Event, buffer interface{}, hooks ...HookFn) (*Aggregate, error)
- func (c *Client) Fetch(ctx context.Context, aggregateID string, buffer interface{}, opt FetchOptions) (*Aggregate, error)
- func (c *Client) RegisterCacheService(cache CacheService)
- func (c *Client) RegisterDomainRules(fn DomainTypeRulesFn, domainType interface{})
- func (c *Client) Rules(domainType interface{}) DomainTypeRulesFn
- type DomainTypeRulesFn
- type DomainTypeRulesMap
- type Error
- type Event
- type EventService
- type FetchOptions
- type Format
- type HookFn
- type Message
- type Params
- type StoreService
- type Topic
- type Version
Constants ¶
const ( ErrParamsIDRequired = Error("parameter id required") ErrNotImplemented = Error("feature not implemented") )
Params errors.
const ( ErrAggregateIDCanNotBeEmpty = Error("aggregateID can not be empty") ErrBufferCanNotBeNil = Error("buffer can not be nil") ErrFormatNotProvided = Error("format is not provided") ErrConcurrencyException = Error("concurrency exception") ErrAggregateIDWithoutEvents = Error("aggregateID without events") ErrEmptyState = Error("aggregate with empty state") ErrInvalidEventFormat = Error("event data is not encoded in the right format") ErrInvalidSchema = Error("invalid schema schema to decode event data") )
Event errors.
const ( ErrKeyDoesNotExist = Error("key does not exist") ErrVersionFieldDoesNotExist = Error("invalid aggregate key - version field does not exist") ErrStateFieldDoesNotExist = Error("invalid aggregate key - state field does not exist") )
Cache errors.
Variables ¶
This section is empty.
Functions ¶
func Subscribe ¶ added in v0.3.0
Subscribe causes package pubsub to relay incoming events to c. If no events are provided, all incoming events will be relayed to c. Otherwise, just the provided events will.
Package pubsub will not block sending to c: the caller must ensure that c has sufficient buffer space to keep up with the expected event rate. For a channel used for notification of just one event value, a buffer of size 1 is sufficient.
It is allowed to call Subscribe multiple times with the same channel: each call expands the set of events sent to that channel.
It is allowed to call Subscribe multiple times with different channels and the same events: each channel receives copies of incoming events independently.
func Unsubscribe ¶ added in v0.3.0
func Unsubscribe(c chan<- *Event)
Unsubscribe remove events from the map.
Types ¶
type Aggregate ¶
type Aggregate struct { State interface{} Version int64 // contains filtered or unexported fields }
Aggregate represents an aggregator state and respective version
type CacheService ¶
type CacheService interface { Get(ctx context.Context, aggregateID string, out *Aggregate) error Set(ctx context.Context, aggregateID string, in *Aggregate) error DB() interface{} }
CacheService represents a service for managing cache.
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client to hold store service implementation
func (*Client) CacheService ¶ added in v0.4.0
func (c *Client) CacheService() CacheService
CacheService assigns a cache service to the client store
func (*Client) Dispatch ¶
func (c *Client) Dispatch(ctx context.Context, event *Event, buffer interface{}, hooks ...HookFn) (*Aggregate, error)
Dispatch returns an aggregate resource based on the event and domain rules defined
func (*Client) Fetch ¶ added in v0.4.5
func (c *Client) Fetch(ctx context.Context, aggregateID string, buffer interface{}, opt FetchOptions) (*Aggregate, error)
Fetch returns an aggregate resource based on the aggregateID and domain type
func (*Client) RegisterCacheService ¶
func (c *Client) RegisterCacheService(cache CacheService)
RegisterCacheService assigns a cache service to the client store
func (*Client) RegisterDomainRules ¶
func (c *Client) RegisterDomainRules(fn DomainTypeRulesFn, domainType interface{})
RegisterDomainRules assigns a cache service to the client store
func (*Client) Rules ¶
func (c *Client) Rules(domainType interface{}) DomainTypeRulesFn
Rules returns domain function rules for a specific domain type if domain types not previously registered log and return template function
type DomainTypeRulesFn ¶
type DomainTypeRulesFn func(topic string, buffer, previous interface{}) (next interface{})
DomainTypeRulesFn represents a function type to define how data in buffer could change the previous State into the next State for a specific topic.
type DomainTypeRulesMap ¶
type DomainTypeRulesMap map[string]DomainTypeRulesFn
DomainTypeRulesMap a map from domain type names to map domain type rules function
type Event ¶
type Event struct { // Topic (name) of the event. These should be written in the past tense (event_created) Topic string // Aggregate ID is the primary key of the aggregate to which the event refers to. AggregateID string // Version of the aggregate, useful when using concurrency writes. (read-only) Version int64 // Schema of the aggregate. Schema string // Format of the encoded type of the aggregate data Format Format // Data raw object data. Data []byte // Priority of the event, where 0 is the highest priority. Priority int32 // TODO: Signature includes SHA1 signature computed against it's contents and signature // of the previous event. (not implemented yet) Signature string // TODO: Origin of the event. e.g. service name. (not implemented yet) OriginName string // TODO: Origin of the event. e.g. service ip address / browser. (not implemented yet) OriginIP string // Metadata Metadata map[string]string // CreateTime timestamp when event ocurred, location should be set to UTC. CreateTime time.Time }
Event resource.
func NewEventProto ¶
NewEventProto returns an event resource.
func NewEventWithMetadata ¶
NewEventWithMetadata returns an event resource.
func (*Event) MarshalProto ¶
MarshalProto takes a protocol buffer message and encodes it into the wire format. Also sets the event schema as the underlying proto message type and encoded format
func (*Event) SetVersion ¶
SetVersion assign event version
type EventService ¶
type EventService interface { Create(ctx context.Context, e *Event) error GetLastVersion(ctx context.Context, aggregateID string) (int64, error) List(ctx context.Context, p Params) ([]*Event, error) }
EventService represents a service for managing an aggregate store.
type FetchOptions ¶ added in v0.5.0
type FetchOptions struct {
SkipOptimisticConcurrency bool
}
FetchOptions is a configurable object for fetch func
type HookFn ¶
HookFn represents a function type that will be called after the store is loaded. Please note that the new event is not yet dispatched / persisted when the HookFn is called.
type Params ¶
type Params struct { // ID is required ID string // FromVersion (optional) FromVersion int64 // ToVersion (optional) ToVersion int64 }
Params represents parameters to load a store
type StoreService ¶
type StoreService interface {
EventService() EventService
}
StoreService represents a service for managing an aggregate store.