Documentation
¶
Index ¶
- Variables
- type AckableEventEnvelope
- type Comparison
- type Conjunction
- type ConsumeFunc
- type ConsumeMiddleware
- type ConsumeMiddlewares
- type Consumer
- type Disjunction
- type Entity
- type EntityMarshaler
- type EntityRepository
- type EntityRoot
- type EntityStore
- type Event
- type EventEnvelope
- type EventMarshaler
- type EventRepository
- type Existence
- type Filter
- func And(fs ...Filter) Filter
- func Eq(path string, v any) Filter
- func Exists(path string, exists bool) Filter
- func Gt(path string, v any) Filter
- func Gte(path string, v any) Filter
- func In(path string, vs ...any) Filter
- func Lt(path string, v any) Filter
- func Lte(path string, v any) Filter
- func Ne(path string, v any) Filter
- func Not(f Filter) Filter
- func Or(fs ...Filter) Filter
- type HandleFunc
- type IDer
- type LoggerCtx
- type MarshaledEntity
- type MarshaledEvent
- type Membership
- type Metadata
- type MetadataGetter
- type MetadataKey
- type MiddlewareConsumer
- type Negation
- type NoopMetadataGetter
- type Notifier
- type Operator
- type Publisher
- type ReceivedEvent
- type SerialConsumer
- type StickyEntityConsumer
- type Subscriber
- type Subscription
- type SubscriptionMiddleware
- type SubscriptionMiddlewares
- type Transport
- type Version
Constants ¶
This section is empty.
Variables ¶
var ( ErrEntityNotFound = errors.New("ember: entity not found") ErrVersionConflict = errors.New("ember: entity version conflict") )
var (
ErrUnknownEvent = errors.New("unknown event")
)
var ErrUnsupportedFilter = errors.New("ember: unsupported filter")
ErrUnsupportedFilter is returned by a repository when it cannot translate a filter node, operator, or value type into its native query dialect.
Functions ¶
This section is empty.
Types ¶
type AckableEventEnvelope ¶
type AckableEventEnvelope struct {
EventEnvelope
Ack func()
Nack func()
}
AckableEventEnvelope
type Comparison ¶
Comparison matches a single path against a value with an operator.
type Conjunction ¶
type Conjunction struct {
Filters []Filter
}
Conjunction is a logical AND over its children.
type ConsumeMiddleware ¶
type ConsumeMiddleware func(next ConsumeFunc) ConsumeFunc
ConsumeMiddleware
type ConsumeMiddlewares ¶
type ConsumeMiddlewares []ConsumeMiddleware
ConsumeMiddlewares
func (ConsumeMiddlewares) Apply ¶
func (a ConsumeMiddlewares) Apply(c ConsumeFunc) ConsumeFunc
type Consumer ¶
type Consumer interface {
Run(ctx context.Context, name string, ch <-chan AckableEventEnvelope, consume ConsumeFunc)
Stop()
}
Consumer
type Disjunction ¶
type Disjunction struct {
Filters []Filter
}
Disjunction is a logical OR over its children.
type EntityMarshaler ¶
type EntityMarshaler[E Entity] interface { Marshal(ctx context.Context, e E) (*MarshaledEntity, error) Unmarshal(ctx context.Context, m *MarshaledEntity) (E, error) }
EntityMarshaler
type EntityRepository ¶
type EntityRepository interface {
Save(ctx context.Context, m *MarshaledEntity) error
Get(ctx context.Context, typ, id string) (*MarshaledEntity, error)
List(ctx context.Context, typ string, f Filter) ([]*MarshaledEntity, error)
}
EntityRepository
type EntityRoot ¶
type EntityRoot struct {
// contains filtered or unexported fields
}
EntityRoot
func NewEntityRoot ¶
func NewEntityRoot(id string) EntityRoot
func (*EntityRoot) ID ¶
func (r *EntityRoot) ID() string
func (*EntityRoot) SetVersion ¶
func (r *EntityRoot) SetVersion(v Version)
func (*EntityRoot) Version ¶
func (r *EntityRoot) Version() Version
type EntityStore ¶
type EntityStore[E Entity] struct { // contains filtered or unexported fields }
EntityStore
func NewEntityStore ¶
func NewEntityStore[E Entity](r EntityRepository, m EntityMarshaler[E]) *EntityStore[E]
func (*EntityStore[E]) Get ¶
func (s *EntityStore[E]) Get(ctx context.Context, id string) (E, error)
type EventEnvelope ¶
type EventEnvelope struct {
ID string
EntityID string
Event *MarshaledEvent
Metadata Metadata
Timestamp time.Time
}
EventEnvelope
type EventMarshaler ¶
type EventMarshaler interface {
Marshal(ctx context.Context, e Event) (*MarshaledEvent, error)
Unmarshal(ctx context.Context, e *MarshaledEvent) (Event, error)
}
EventMarshaler
type EventRepository ¶
type EventRepository interface {
Save(ctx context.Context, envelopes []EventEnvelope) error
}
EventRepository
type Filter ¶
type Filter interface {
// contains filtered or unexported methods
}
Filter is a sealed sum type. Only ember-defined nodes satisfy it, so repositories can translate the closed set exhaustively.
Null/missing-path semantics (two-valued): a path predicate (Comparison, Membership) is satisfied only when the referenced path is present, non-null, and the comparison holds; a missing or null path makes that leaf false. And/Or/Not combine leaves as ordinary booleans. As a result Ne(p, x) does not match entities where p is absent/null, while Not(Eq(p, x)) does. Exists(p, true) means present and non-null; Exists(p, false) is its complement. Every backend honors these semantics identically.
type LoggerCtx ¶
type LoggerCtx interface {
Debug(ctx context.Context, msg string, kvs ...interface{})
Info(ctx context.Context, msg string, kvs ...interface{})
Warn(ctx context.Context, msg string, kvs ...interface{})
Error(ctx context.Context, msg string, err error, kvs ...interface{})
}
LoggerCtx is the context-aware structured logger used across spark. Implementations adapt arbitrary backends (slog, zap, zerolog, etc.) by satisfying this interface.
kvs is a flat sequence of alternating keys and values. Keys must be strings; values may be any type the underlying backend can render. Mismatched pairs are the implementation's problem to surface.
Error takes the error as a separate positional argument so backends can render stack traces or attach it as a typed field rather than a string.
var NopLogger LoggerCtx = nopLogger{}
NopLogger is a LoggerCtx that drops every call. Used as the zero-value default by middleware that takes an optional logger.
type MarshaledEntity ¶
MarshaledEntity
type Membership ¶
Membership matches when the path's value is one of Values (IN).
type Metadata ¶
type Metadata map[MetadataKey]interface{}
Metadata
func (Metadata) CurrentDelivery ¶
CurrentDelivery returns the 1-based delivery attempt for the event, or 0 when absent.
func (Metadata) IsLastDelivery ¶
IsLastDelivery reports whether this is the final redelivery attempt. It is false when the consumer is uncapped (no max-deliveries bound) or the counts are absent.
func (Metadata) MaxDeliveries ¶
MaxDeliveries returns the consumer's delivery cap, or 0 when the consumer is uncapped or the value is absent.
type MetadataGetter ¶
MetadataGetter
type MetadataKey ¶
type MetadataKey string
MetadataKey
const ( // MetadataKeyCurrentDelivery is the 1-based delivery attempt for the event, // stamped by a subscriber from the transport's redelivery count. MetadataKeyCurrentDelivery MetadataKey = "current_delivery" // MetadataKeyMaxDeliveries is the consumer's delivery cap (its DLQ bound), // stamped only when the consumer is capped. MetadataKeyMaxDeliveries MetadataKey = "max_deliveries" )
type MiddlewareConsumer ¶
type MiddlewareConsumer struct {
// contains filtered or unexported fields
}
MiddlewareConsumer
func NewMiddlewareConsumer ¶
func NewMiddlewareConsumer(c Consumer, m ...ConsumeMiddleware) *MiddlewareConsumer
func (*MiddlewareConsumer) Run ¶
func (c *MiddlewareConsumer) Run(ctx context.Context, name string, ch <-chan AckableEventEnvelope, consume ConsumeFunc)
func (*MiddlewareConsumer) Stop ¶
func (c *MiddlewareConsumer) Stop()
type Notifier ¶
type Notifier interface {
Notify(ctx context.Context, envelopes []EventEnvelope)
}
Notifier
type Publisher ¶
type Publisher struct {
// contains filtered or unexported fields
}
Publisher
func NewPublisher ¶
func NewPublisher(i IDer, r EventRepository, mg MetadataGetter, m EventMarshaler, n Notifier) *Publisher
type ReceivedEvent ¶
ReceivedEvent
type SerialConsumer ¶
type SerialConsumer struct {
// contains filtered or unexported fields
}
SerialConsumer
func NewSerialConsumer ¶
func NewSerialConsumer() *SerialConsumer
func (*SerialConsumer) Run ¶
func (c *SerialConsumer) Run(ctx context.Context, _ string, ch <-chan AckableEventEnvelope, consume ConsumeFunc)
func (*SerialConsumer) Stop ¶
func (c *SerialConsumer) Stop()
type StickyEntityConsumer ¶
type StickyEntityConsumer struct {
// contains filtered or unexported fields
}
StickyEntityConsumer
func NewStickyEntityConsumer ¶
func NewStickyEntityConsumer(concurrency map[string]int, l LoggerCtx) *StickyEntityConsumer
func (*StickyEntityConsumer) Run ¶
func (c *StickyEntityConsumer) Run(ctx context.Context, name string, ch <-chan AckableEventEnvelope, consume ConsumeFunc)
func (*StickyEntityConsumer) Stop ¶
func (c *StickyEntityConsumer) Stop()
type Subscriber ¶
type Subscriber struct {
// contains filtered or unexported fields
}
Subscriber
func NewSubscriber ¶
func NewSubscriber(m EventMarshaler, t Transport, c Consumer, l LoggerCtx) *Subscriber
func (*Subscriber) Stop ¶
func (s *Subscriber) Stop()
func (*Subscriber) Subscribe ¶
func (s *Subscriber) Subscribe(ctx context.Context, sub Subscription, m ...SubscriptionMiddleware) error
type Subscription ¶
type Subscription interface {
Name() string
Handle(ctx context.Context, e *ReceivedEvent) error
}
Subscription
type SubscriptionMiddleware ¶
type SubscriptionMiddleware func(next HandleFunc) HandleFunc
SubscriptionMiddleware
type SubscriptionMiddlewares ¶
type SubscriptionMiddlewares []SubscriptionMiddleware
SubscriptionMiddlewares
func (SubscriptionMiddlewares) Apply ¶
func (a SubscriptionMiddlewares) Apply(sub Subscription) HandleFunc
type Transport ¶
type Transport interface {
Subscribe(ctx context.Context, name string) (<-chan AckableEventEnvelope, error)
Stop()
}
Transport
type Version ¶
type Version struct {
// contains filtered or unexported fields
}
Version