fact

package module
v0.13.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 21, 2026 License: MIT Imports: 7 Imported by: 8

README

axon-fact

Primitives · Part of the lamina workspace

Event sourcing primitives for Go. An append-only event store, synchronous projectors for building read models, and an async publisher interface for streaming events to external consumers. Includes in-memory and PostgreSQL-backed implementations. NATS adapters are in the separate axon-nats module.

Getting started

go get github.com/benaskins/axon-fact
store := fact.NewMemoryStore(
    fact.WithProjector(myProjector),
)

events := []fact.Event{
    {Type: "OrderPlaced", Data: json.RawMessage(`{"item":"widget"}`)},
}

store.Append(ctx, "order-123", events)

Key types

  • Event — immutable record with stream, type, JSON data, metadata, and sequence number
  • EventStore — append-only persistence with Append, Load, and LoadFrom
  • Projector — synchronous event handler that builds read models within Append
  • Publisher — async event delivery to external systems (e.g., NATS JetStream)
  • MemoryStore — in-memory EventStore implementation with projector and publisher support
  • PostgresStore — PostgreSQL-backed EventStore with transactional appends and replay

License

MIT

Documentation

Overview

Package fact provides event sourcing primitives: an append-only event store, synchronous projectors for building read models, and an async publisher interface for streaming events to external consumers.

Class: primitive UseWhen: Audit trail, replay, training corpus.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewEventID added in v0.12.0

func NewEventID() string

NewEventID returns a random 32-character hex string suitable for use as an event ID.

Types

type Event

type Event struct {
	ID         string            `json:"id"`
	Stream     string            `json:"stream"`
	Type       string            `json:"type"`
	Data       json.RawMessage   `json:"data"`
	Metadata   map[string]string `json:"metadata,omitempty"`
	Sequence   int64             `json:"sequence"`
	OccurredAt time.Time         `json:"occurred_at"`
}

Event is an immutable record of something that happened.

func NewEvent added in v0.12.0

func NewEvent(stream string, data EventTyper) (Event, error)

NewEvent creates an Event from a domain event struct, marshaling data to JSON. Metadata is left nil.

func NewEventWithMeta added in v0.12.0

func NewEventWithMeta(stream string, data EventTyper, meta map[string]string) (Event, error)

NewEventWithMeta creates an Event with optional metadata from a domain event struct, marshaling data to JSON.

type EventStore

type EventStore interface {
	// Append persists events to a stream and runs synchronous projections.
	Append(ctx context.Context, stream string, events []Event) error

	// Load returns all events for a stream in sequence order.
	Load(ctx context.Context, stream string) ([]Event, error)

	// LoadFrom returns events for a stream starting after a sequence number.
	LoadFrom(ctx context.Context, stream string, fromSequence int64) ([]Event, error)
}

EventStore persists and retrieves events.

type EventTyper added in v0.12.0

type EventTyper interface {
	EventType() string
}

EventTyper is implemented by domain event structs that can be stored as events. Each struct returns a dot-separated type string (e.g. "order.created").

type Fact added in v0.1.8

type Fact struct {
	Schema string
	Data   map[string]any
}

Fact is a single row of typed data conforming to a Schema. Keys correspond to Field.Name values in the schema.

type Field added in v0.1.8

type Field struct {
	Name string
	Type FieldType
}

Field describes a single column in a schema.

type FieldType added in v0.1.8

type FieldType int

FieldType identifies the storage type of a schema field.

const (
	String               FieldType = iota // Variable-length text
	LowCardinalityString                  // Text with few distinct values (optimised for columnar stores)
	Bool                                  // Boolean
	UInt16                                // Unsigned 16-bit integer
	UInt32                                // Unsigned 32-bit integer
	Float32                               // 32-bit floating point
	Float64                               // 64-bit floating point
	DateTime64                            // Millisecond-precision timestamp
	JSON                                  // String holding JSON
)

type Materializer added in v0.1.8

type Materializer interface {
	// EnsureSchema creates or migrates storage for the given schemas.
	EnsureSchema(ctx context.Context, schemas ...Schema) error

	// Materialize writes facts to the store. Each fact's Schema field
	// identifies which Schema it conforms to.
	Materialize(ctx context.Context, facts ...Fact) error
}

Materializer persists facts to an external analytical store. Implementations (e.g. ClickHouse) use Schema metadata to create tables and generate insert statements.

type MemoryStore

type MemoryStore struct {
	// contains filtered or unexported fields
}

MemoryStore is an in-memory EventStore with synchronous projection and async publishing. Safe for concurrent use.

func NewMemoryStore

func NewMemoryStore(opts ...Option) *MemoryStore

NewMemoryStore creates an in-memory event store.

func (*MemoryStore) Append

func (s *MemoryStore) Append(ctx context.Context, stream string, events []Event) error

Append stores events in a stream, assigns sequences and timestamps, runs projectors synchronously, then publishes asynchronously.

If called from within a projector (nested Append), the mutex is already held and will not be re-acquired. Publishing only runs from the outermost Append.

func (*MemoryStore) Load

func (s *MemoryStore) Load(ctx context.Context, stream string) ([]Event, error)

Load returns all events for a stream in sequence order.

func (*MemoryStore) LoadFrom

func (s *MemoryStore) LoadFrom(ctx context.Context, stream string, fromSequence int64) ([]Event, error)

LoadFrom returns events for a stream with sequence greater than fromSequence.

type Option

type Option func(*MemoryStore)

Option configures a MemoryStore.

func WithProjector

func WithProjector(p Projector) Option

WithProjector registers a projector that runs synchronously on Append.

func WithPublishErrorHandler

func WithPublishErrorHandler(fn func(error)) Option

WithPublishErrorHandler registers a callback invoked when a publisher returns an error. The callback runs in the publisher goroutine. This is in addition to the default slog.Error logging.

func WithPublisher

func WithPublisher(p Publisher) Option

WithPublisher registers a publisher that runs asynchronously after Append.

type Pipeline added in v0.1.8

type Pipeline struct {
	// contains filtered or unexported fields
}

Pipeline fans out facts to multiple sinks: an EventStore for append-only persistence, a Materializer for analytical projections, and Publishers for async delivery.

func NewPipeline added in v0.1.8

func NewPipeline(opts ...PipelineOption) *Pipeline

NewPipeline creates a Pipeline with the given options.

func (*Pipeline) EnsureSchemas added in v0.1.8

func (p *Pipeline) EnsureSchemas(ctx context.Context, schemas ...Schema) error

EnsureSchemas delegates to the Materializer to create or migrate storage for the given schemas. No-op if no Materializer is configured.

func (*Pipeline) Record added in v0.1.8

func (p *Pipeline) Record(ctx context.Context, facts ...Fact) error

Record fans out facts to all configured sinks.

Synchronous: EventStore.Append and Materializer.Materialize run in sequence. If either fails, Record returns the error immediately.

Asynchronous: Publishers fire in goroutines after synchronous sinks succeed. Publish failures are logged but do not fail Record.

type PipelineOption added in v0.1.8

type PipelineOption func(*Pipeline)

PipelineOption configures a Pipeline.

func WithMaterializer added in v0.1.8

func WithMaterializer(m Materializer) PipelineOption

WithMaterializer configures the Materializer sink. If not set, facts are not materialised to an analytical store.

func WithPipelinePublisher added in v0.1.8

func WithPipelinePublisher(pub Publisher) PipelineOption

WithPipelinePublisher adds a Publisher sink. Multiple publishers are supported; each runs independently. Publishers fire asynchronously after Record returns.

func WithStore added in v0.1.8

func WithStore(store EventStore, stream string) PipelineOption

WithStore configures the EventStore sink and the stream name facts are appended to. If not set, facts are not persisted to an event log.

type Projector

type Projector interface {
	Handle(ctx context.Context, event Event) error
}

Projector processes events to update a read model. Projectors run synchronously within Append — the caller sees the projected state immediately after the append returns.

type Publisher

type Publisher interface {
	Publish(ctx context.Context, events []Event) error
}

Publisher sends events to an external stream (e.g., NATS JetStream). Publishers run asynchronously after Append succeeds — a publish failure does not roll back the append.

type Schema added in v0.1.8

type Schema struct {
	// Name identifies this fact type (e.g. "eval_bfcl", "message").
	// Materializers derive table names from this.
	Name string

	// Fields lists the columns in order.
	Fields []Field

	// OrderBy lists field names that define the primary ordering.
	// Materializers use this for CREATE TABLE ORDER BY clauses.
	OrderBy []string
}

Schema describes the shape of a fact type for materialisation. The composition root (axon-eval, axon-chat, etc.) defines schemas; a Materializer uses them to create tables and insert rows.

func (Schema) FieldByName added in v0.1.8

func (s Schema) FieldByName(name string) (Field, bool)

FieldByName returns the field with the given name, or zero Field if not found.

Directories

Path Synopsis
Example demonstrates basic event sourcing with axon-fact: create a store with a projector, append events, and read projected state.
Example demonstrates basic event sourcing with axon-fact: create a store with a projector, append events, and read projected state.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL