ember

package module
v0.0.0-...-1a1dc4e Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 19, 2026 License: Apache-2.0 Imports: 5 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrEntityNotFound  = errors.New("ember: entity not found")
	ErrVersionConflict = errors.New("ember: entity version conflict")
)
View Source
var (
	ErrUnknownEvent = errors.New("unknown event")
)
View Source
var ErrUnsupportedFilter = errors.New("ember: unsupported filter")

ErrUnsupportedFilter is returned by a repository when it cannot translate a filter node, operator, or value type into its native query dialect.

Functions

This section is empty.

Types

type AckableEventEnvelope

type AckableEventEnvelope struct {
	EventEnvelope

	Ack  func()
	Nack func()
}

AckableEventEnvelope

type Comparison

type Comparison struct {
	Path  string
	Op    Operator
	Value any
}

Comparison matches a single path against a value with an operator.

type Conjunction

type Conjunction struct {
	Filters []Filter
}

Conjunction is a logical AND over its children.

type ConsumeFunc

type ConsumeFunc func(context.Context, AckableEventEnvelope)

ConsumeFunc

type ConsumeMiddleware

type ConsumeMiddleware func(next ConsumeFunc) ConsumeFunc

ConsumeMiddleware

type ConsumeMiddlewares

type ConsumeMiddlewares []ConsumeMiddleware

ConsumeMiddlewares

func (ConsumeMiddlewares) Apply

type Consumer

type Consumer interface {
	Run(ctx context.Context, name string, ch <-chan AckableEventEnvelope, consume ConsumeFunc)
	Stop()
}

Consumer

type Disjunction

type Disjunction struct {
	Filters []Filter
}

Disjunction is a logical OR over its children.

type Entity

type Entity interface {
	ID() string
	Type() string
	Version() Version
	SetVersion(Version)
}

Entity

type EntityMarshaler

type EntityMarshaler[E Entity] interface {
	Marshal(ctx context.Context, e E) (*MarshaledEntity, error)
	Unmarshal(ctx context.Context, m *MarshaledEntity) (E, error)
}

EntityMarshaler

type EntityRepository

type EntityRepository interface {
	Save(ctx context.Context, m *MarshaledEntity) error
	Get(ctx context.Context, typ, id string) (*MarshaledEntity, error)
	List(ctx context.Context, typ string, f Filter) ([]*MarshaledEntity, error)
}

EntityRepository

type EntityRoot

type EntityRoot struct {
	// contains filtered or unexported fields
}

EntityRoot

func NewEntityRoot

func NewEntityRoot(id string) EntityRoot

func (*EntityRoot) ID

func (r *EntityRoot) ID() string

func (*EntityRoot) SetVersion

func (r *EntityRoot) SetVersion(v Version)

func (*EntityRoot) Version

func (r *EntityRoot) Version() Version

type EntityStore

type EntityStore[E Entity] struct {
	// contains filtered or unexported fields
}

EntityStore

func NewEntityStore

func NewEntityStore[E Entity](r EntityRepository, m EntityMarshaler[E]) *EntityStore[E]

func (*EntityStore[E]) Get

func (s *EntityStore[E]) Get(ctx context.Context, id string) (E, error)

func (*EntityStore[E]) List

func (s *EntityStore[E]) List(ctx context.Context, f Filter) ([]E, error)

func (*EntityStore[E]) Save

func (s *EntityStore[E]) Save(ctx context.Context, e E) error

type Event

type Event interface {
	EntityID() string
	Type() string
}

Event

type EventEnvelope

type EventEnvelope struct {
	ID        string
	EntityID  string
	Event     *MarshaledEvent
	Metadata  Metadata
	Timestamp time.Time
}

EventEnvelope

type EventMarshaler

type EventMarshaler interface {
	Marshal(ctx context.Context, e Event) (*MarshaledEvent, error)
	Unmarshal(ctx context.Context, e *MarshaledEvent) (Event, error)
}

EventMarshaler

type EventRepository

type EventRepository interface {
	Save(ctx context.Context, envelopes []EventEnvelope) error
}

EventRepository

type Existence

type Existence struct {
	Path   string
	Exists bool
}

Existence matches on whether the path is present (and non-null).

type Filter

type Filter interface {
	// contains filtered or unexported methods
}

Filter is a sealed sum type. Only ember-defined nodes satisfy it, so repositories can translate the closed set exhaustively.

Null/missing-path semantics (two-valued): a path predicate (Comparison, Membership) is satisfied only when the referenced path is present, non-null, and the comparison holds; a missing or null path makes that leaf false. And/Or/Not combine leaves as ordinary booleans. As a result Ne(p, x) does not match entities where p is absent/null, while Not(Eq(p, x)) does. Exists(p, true) means present and non-null; Exists(p, false) is its complement. Every backend honors these semantics identically.

func And

func And(fs ...Filter) Filter

And combines filters with logical AND.

func Eq

func Eq(path string, v any) Filter

Eq matches path == v.

func Exists

func Exists(path string, exists bool) Filter

Exists matches on whether the path is present (exists=true) or absent (exists=false).

func Gt

func Gt(path string, v any) Filter

Gt matches path > v.

func Gte

func Gte(path string, v any) Filter

Gte matches path >= v.

func In

func In(path string, vs ...any) Filter

In matches when the path's value is one of vs.

func Lt

func Lt(path string, v any) Filter

Lt matches path < v.

func Lte

func Lte(path string, v any) Filter

Lte matches path <= v.

func Ne

func Ne(path string, v any) Filter

Ne matches path != v.

func Not

func Not(f Filter) Filter

Not negates a filter.

func Or

func Or(fs ...Filter) Filter

Or combines filters with logical OR.

type HandleFunc

type HandleFunc func(context.Context, *ReceivedEvent) error

HandleFunc

type IDer

type IDer interface {
	ID() string
}

IDer

type LoggerCtx

type LoggerCtx interface {
	Debug(ctx context.Context, msg string, kvs ...interface{})
	Info(ctx context.Context, msg string, kvs ...interface{})
	Warn(ctx context.Context, msg string, kvs ...interface{})
	Error(ctx context.Context, msg string, err error, kvs ...interface{})
}

LoggerCtx is the context-aware structured logger used across spark. Implementations adapt arbitrary backends (slog, zap, zerolog, etc.) by satisfying this interface.

kvs is a flat sequence of alternating keys and values. Keys must be strings; values may be any type the underlying backend can render. Mismatched pairs are the implementation's problem to surface.

Error takes the error as a separate positional argument so backends can render stack traces or attach it as a typed field rather than a string.

var NopLogger LoggerCtx = nopLogger{}

NopLogger is a LoggerCtx that drops every call. Used as the zero-value default by middleware that takes an optional logger.

type MarshaledEntity

type MarshaledEntity struct {
	ID      string
	Type    string
	Version Version
	Data    []byte
}

MarshaledEntity

type MarshaledEvent

type MarshaledEvent struct {
	Type string
	Data []byte
}

MarshaledEvent

type Membership

type Membership struct {
	Path   string
	Values []any
}

Membership matches when the path's value is one of Values (IN).

type Metadata

type Metadata map[MetadataKey]interface{}

Metadata

func (Metadata) CurrentDelivery

func (m Metadata) CurrentDelivery() int

CurrentDelivery returns the 1-based delivery attempt for the event, or 0 when absent.

func (Metadata) IsLastDelivery

func (m Metadata) IsLastDelivery() bool

IsLastDelivery reports whether this is the final redelivery attempt. It is false when the consumer is uncapped (no max-deliveries bound) or the counts are absent.

func (Metadata) MaxDeliveries

func (m Metadata) MaxDeliveries() int

MaxDeliveries returns the consumer's delivery cap, or 0 when the consumer is uncapped or the value is absent.

type MetadataGetter

type MetadataGetter interface {
	Get(ctx context.Context) (Metadata, error)
}

MetadataGetter

type MetadataKey

type MetadataKey string

MetadataKey

const (
	// MetadataKeyCurrentDelivery is the 1-based delivery attempt for the event,
	// stamped by a subscriber from the transport's redelivery count.
	MetadataKeyCurrentDelivery MetadataKey = "current_delivery"
	// MetadataKeyMaxDeliveries is the consumer's delivery cap (its DLQ bound),
	// stamped only when the consumer is capped.
	MetadataKeyMaxDeliveries MetadataKey = "max_deliveries"
)

type MiddlewareConsumer

type MiddlewareConsumer struct {
	// contains filtered or unexported fields
}

MiddlewareConsumer

func NewMiddlewareConsumer

func NewMiddlewareConsumer(c Consumer, m ...ConsumeMiddleware) *MiddlewareConsumer

func (*MiddlewareConsumer) Run

func (c *MiddlewareConsumer) Run(ctx context.Context, name string, ch <-chan AckableEventEnvelope, consume ConsumeFunc)

func (*MiddlewareConsumer) Stop

func (c *MiddlewareConsumer) Stop()

type Negation

type Negation struct {
	Filter Filter
}

Negation is a logical NOT of its child.

type NoopMetadataGetter

type NoopMetadataGetter struct {
}

NoopMetadataGetter

func (NoopMetadataGetter) Get

type Notifier

type Notifier interface {
	Notify(ctx context.Context, envelopes []EventEnvelope)
}

Notifier

type Operator

type Operator int

Operator is a comparison operator used by Comparison.

const (
	OpEq Operator = iota
	OpNe
	OpGt
	OpGte
	OpLt
	OpLte
)

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

Publisher

func (*Publisher) Publish

func (p *Publisher) Publish(ctx context.Context, events ...Event) error

type ReceivedEvent

type ReceivedEvent struct {
	Event

	ID        string
	Metadata  Metadata
	Timestamp time.Time
}

ReceivedEvent

type SerialConsumer

type SerialConsumer struct {
	// contains filtered or unexported fields
}

SerialConsumer

func NewSerialConsumer

func NewSerialConsumer() *SerialConsumer

func (*SerialConsumer) Run

func (c *SerialConsumer) Run(ctx context.Context, _ string, ch <-chan AckableEventEnvelope, consume ConsumeFunc)

func (*SerialConsumer) Stop

func (c *SerialConsumer) Stop()

type StickyEntityConsumer

type StickyEntityConsumer struct {
	// contains filtered or unexported fields
}

StickyEntityConsumer

func NewStickyEntityConsumer

func NewStickyEntityConsumer(concurrency map[string]int, l LoggerCtx) *StickyEntityConsumer

func (*StickyEntityConsumer) Run

func (c *StickyEntityConsumer) Run(ctx context.Context, name string, ch <-chan AckableEventEnvelope, consume ConsumeFunc)

func (*StickyEntityConsumer) Stop

func (c *StickyEntityConsumer) Stop()

type Subscriber

type Subscriber struct {
	// contains filtered or unexported fields
}

Subscriber

func NewSubscriber

func NewSubscriber(m EventMarshaler, t Transport, c Consumer, l LoggerCtx) *Subscriber

func (*Subscriber) Stop

func (s *Subscriber) Stop()

func (*Subscriber) Subscribe

func (s *Subscriber) Subscribe(ctx context.Context, sub Subscription, m ...SubscriptionMiddleware) error

type Subscription

type Subscription interface {
	Name() string
	Handle(ctx context.Context, e *ReceivedEvent) error
}

Subscription

type SubscriptionMiddleware

type SubscriptionMiddleware func(next HandleFunc) HandleFunc

SubscriptionMiddleware

type SubscriptionMiddlewares

type SubscriptionMiddlewares []SubscriptionMiddleware

SubscriptionMiddlewares

func (SubscriptionMiddlewares) Apply

type Transport

type Transport interface {
	Subscribe(ctx context.Context, name string) (<-chan AckableEventEnvelope, error)
	Stop()
}

Transport

type Version

type Version struct {
	// contains filtered or unexported fields
}

Version

func NewVersion

func NewVersion(initial uint64) Version

func (Version) Inc

func (v Version) Inc() Version

func (Version) Initial

func (v Version) Initial() uint64

func (Version) Value

func (v Version) Value() uint64

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL