ledger

package module
v0.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 14, 2026 License: MIT Imports: 6 Imported by: 0

README

ledger

CI Go Reference Go Report Card Release OpenSSF Scorecard

Append-only log library for Go with typed generic entries, schema versioning, and pluggable storage backends.

Model

A Store represents one entity type — one table or collection. Create it with a name describing the type (e.g., "orders", "audit_events"). All streams in that store share the same schema and codec.

A Stream is an instance within a type. Create it with a stream ID (e.g., "user-123", "org-456"). Streams are implicit — they're created on first append and require no setup.

Use store.ListStreamIDs(ctx) to enumerate all streams of the type.

Features

  • Generic typed streamsStream[I, T] provides compile-time type safety for entry payloads
  • Lightweight streams — create per operation and discard, no lifecycle management
  • Stream discoveryListStreamIDs enumerates all streams in a store
  • Schema versioning — entries are stamped with a version; upcasters transform old entries on read
  • Deduplication — storage-level dedup via partial unique indexes, silently skips duplicates
  • Ordering keys — filter entries by an ordering key (e.g., aggregate ID)
  • Mutable tags & annotations — update labels and key-value state on existing entries; filter reads by tag
  • Cursor-based pagination — efficient reads with After(id), Limit(n), Desc()
  • Retention managementTrim(ctx, stream, beforeID) for log compaction
  • External transactions — participate in a caller-managed *sql.Tx or mongo.Session via WithTx(ctx, tx)
  • Health checks — all backends implement HealthChecker interface
  • Structured logging — all backends log via slog.Default(); override with WithLogger()
  • Three backends — SQLite, PostgreSQL, MongoDB

Installation

go get github.com/rbaliyan/ledger

Quick Start

package main

import (
    "context"
    "database/sql"
    "fmt"

    "github.com/rbaliyan/ledger"
    "github.com/rbaliyan/ledger/sqlite"
    _ "modernc.org/sqlite"
)

type Order struct {
    ID     string  `json:"id"`
    Amount float64 `json:"amount"`
}

func main() {
    ctx := context.Background()

    // Open the database once.
    db, _ := sql.Open("sqlite", "app.db")

    // Open one store per entity type. The table name IS the type.
    orders, _ := sqlite.New(ctx, db, sqlite.WithTable("orders"))
    defer orders.Close(ctx)

    // Create a lightweight stream for one instance ("user-123") of the orders type.
    s := ledger.NewStream[int64, Order](orders, "user-123")

    // Append entries with ordering key, dedup key, and metadata
    ids, _ := s.Append(ctx, ledger.AppendInput[Order]{
        Payload:  Order{ID: "o-1", Amount: 99.99},
        OrderKey: "customer-123",
        DedupKey: "evt-abc",
        Metadata: map[string]string{"source": "api"},
    })
    fmt.Println("appended", len(ids), "entries")

    // Read entries (default limit is 100)
    entries, _ := s.Read(ctx)

    // Continue from where you left off
    if len(entries) > 0 {
        more, _ := s.Read(ctx, ledger.After(entries[len(entries)-1].ID), ledger.Limit(50))
        _ = more
    }

    // Filter by ordering key
    byCustomer, _ := s.Read(ctx, ledger.WithOrderKey("customer-123"))
    _ = byCustomer

    // Enumerate every order stream in this store.
    streamIDs, _ := orders.ListStreamIDs(ctx)
    for _, id := range streamIDs {
        // Load each stream and process.
        _ = ledger.NewStream[int64, Order](orders, id)
    }
}
Multiple types

Each type is a separate store, typically backed by a separate table or collection:

orders, _ := sqlite.New(ctx, db, sqlite.WithTable("orders"))
users,  _ := sqlite.New(ctx, db, sqlite.WithTable("users"))

// Streams within a store are independent of streams in other stores —
// `"alice"` under `orders` and `"alice"` under `users` are separate.
ordStream  := ledger.NewStream[int64, Order](orders, "alice")
userStream := ledger.NewStream[int64, User](users,  "alice")

Schema Versioning

When your payload type evolves, register upcasters to transform old entries on read:

type OrderV2 struct {
    Name   string  `json:"name"`
    Email  string  `json:"email"`
    Amount float64 `json:"amount"`
}

s := ledger.NewStream[int64, OrderV2](store, "orders",
    ledger.WithSchemaVersion(2),
    ledger.WithUpcaster(ledger.NewFieldMapper(1, 2).
        RenameField("customer_name", "name").
        AddDefault("email", "unknown@example.com")),
)

// Old v1 entries are automatically upcasted to v2 before decoding
entries, _ := s.Read(ctx)

Custom Codec

Payloads are encoded with JSON by default. Provide a custom Codec implementation for alternative formats (protobuf, msgpack, etc.):

s := ledger.NewStream[int64, Order](store, "orders",
    ledger.WithCodec(myProtobufCodec{}),
)

Backends

SQLite
import "github.com/rbaliyan/ledger/sqlite"

// db is a *sql.DB opened with a SQLite driver (e.g., modernc.org/sqlite)
store, err := sqlite.New(ctx, db,
    sqlite.WithTable("my_ledger"),
    sqlite.WithLogger(slog.Default()),
)
PostgreSQL
import "github.com/rbaliyan/ledger/postgres"

// db is a *sql.DB opened with a PostgreSQL driver (e.g., github.com/lib/pq)
store, err := postgres.New(ctx, db,
    postgres.WithTable("my_ledger"),
)
MongoDB
import "github.com/rbaliyan/ledger/mongodb"

// db is a *mongo.Database from an already-connected mongo.Client
store, err := mongodb.New(ctx, db,
    mongodb.WithCollection("my_ledger"),
)

Atomicity note: SQL backends (sqlite, postgres) use transactions for atomic batch inserts. MongoDB uses InsertMany with ordered:false — partial success is possible on non-dedup errors.

Store Interface

type Store[I comparable] interface {
    Append(ctx context.Context, stream string, entries ...RawEntry) ([]I, error)
    Read(ctx context.Context, stream string, opts ...ReadOption) ([]StoredEntry[I], error)
    Count(ctx context.Context, stream string) (int64, error)
    SetTags(ctx context.Context, stream string, id I, tags []string) error
    SetAnnotations(ctx context.Context, stream string, id I, ann map[string]*string) error
    Trim(ctx context.Context, stream string, beforeID I) (int64, error)
    ListStreamIDs(ctx context.Context, opts ...ListOption) ([]string, error)
    Close(ctx context.Context) error
}

ID types: int64 for SQLite/PostgreSQL, string (hex ObjectID) for MongoDB.

Read defaults to ascending order with a limit of 100.

Deduplication

Entries with a non-empty DedupKey are subject to per-stream dedup via a partial unique index. Duplicates are silently skipped. Empty DedupKey means no dedup — the entry is always appended.

Metadata

Attach arbitrary immutable key-value metadata at append time:

s.Append(ctx, ledger.AppendInput[Order]{
    Payload:  order,
    Metadata: map[string]string{"trace_id": "abc123", "source": "api"},
})

Metadata is stored as JSON (SQL backends) or a BSON subdocument (MongoDB) and never changes after append.

Tags and Annotations

Entries have two mutable fields that can be updated after append:

  • Tags []string — ordered labels for categorization and filtering (e.g., "processed", "archived").
  • Annotations map[string]string — key-value state separate from immutable Metadata (e.g., "processed_at", "error").
// Initial tags can be set at append time.
ids, _ := s.Append(ctx, ledger.AppendInput[Order]{
    Payload: order,
    Tags:    []string{"pending"},
})
id := ids[0]

// Replace tags on an existing entry.
store.SetTags(ctx, "user-123", id, []string{"processed", "reviewed"})

// Merge annotations. A nil value deletes that key.
v := "2026-04-13"
store.SetAnnotations(ctx, "user-123", id, map[string]*string{
    "processed_at": &v,
    "error":        nil,
})

Filter reads by tag:

// Entries carrying this tag:
entries, _ := s.Read(ctx, ledger.WithTag("processed"))

// Entries carrying ALL of these tags:
entries, _ = s.Read(ctx, ledger.WithAllTags("processed", "audited"))

SetTags and SetAnnotations return ErrEntryNotFound when the entry doesn't exist in the stream.

Transactions

Participate in a caller-managed transaction by attaching it to the context. Store methods invoked with that context use the caller's transaction instead of creating their own:

// SQL (sqlite, postgres)
tx, _ := db.BeginTx(ctx, nil)
ctx = ledger.WithTx(ctx, tx)
store.Append(ctx, "user-123", entry) // participates in tx
tx.Commit()

// MongoDB
sess, _ := client.StartSession()
sess.WithTransaction(ctx, func(sc context.Context) (any, error) {
    ctx := ledger.WithTx(sc, sess)
    _, err := store.Append(ctx, "user-123", entry)
    return nil, err
})

Without WithTx, SQL backends open their own transaction per batch Append; MongoDB uses InsertMany with ordered:false (see Atomicity note above).

Custom Backends

Implement the Store[I] interface and validate with the conformance test suite:

import "github.com/rbaliyan/ledger/storetest"

func TestConformance(t *testing.T) {
    store := myBackend.New(ctx, db)
    storetest.RunStoreTests(t, store, ledger.After[int64])
}

Testing

# Unit tests (SQLite only, no external deps)
go test ./...

# Integration tests (requires Docker)
just test-integration

# Individual backends
just test-sqlite
just test-pg
just test-mongo

# Benchmarks
just bench

License

MIT

Documentation

Overview

Package ledger provides an append-only log with typed generic entries, schema versioning, deduplication, and pluggable storage backends.

The core abstraction is a two-level generic design:

  • Store is the backend interface, generic over the ID type I and the store-native payload type P (e.g. json.RawMessage for SQL, bson.Raw for MongoDB).
  • Stream is a lightweight typed handle, generic over I, P, and the user's domain payload type T. A PayloadCodec bridges T and P.

Streams are cheap to create — one per operation, then discard. Schema versioning with Upcaster chains enables safe payload evolution without downtime.

Each entry has immutable fields (Payload, OrderKey, DedupKey, SchemaVersion, Metadata, CreatedAt) set at append time, and mutable fields (Tags, Annotations) that can be updated after the entry is written.

Backends: sqlite (Store[int64, json.RawMessage]), postgres (Store[int64, json.RawMessage]), mongodb (Store[string, bson.Raw]).

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	// ErrStoreClosed is returned when operating on a closed store.
	ErrStoreClosed = errors.New("ledger: store closed")

	// ErrEncode is returned when payload encoding fails.
	ErrEncode = errors.New("ledger: encode failed")

	// ErrDecode is returned when payload decoding fails.
	ErrDecode = errors.New("ledger: decode failed")

	// ErrNoUpcaster is returned when no upcaster is available for a version transition.
	ErrNoUpcaster = errors.New("ledger: no upcaster available")

	// ErrInvalidCursor is returned when a cursor value has an unexpected type.
	ErrInvalidCursor = errors.New("ledger: invalid cursor type")

	// ErrInvalidName is returned when a table or collection name is invalid.
	ErrInvalidName = errors.New("ledger: invalid table/collection name")

	// ErrEntryNotFound is returned when SetTags or SetAnnotations targets a non-existent entry.
	ErrEntryNotFound = errors.New("ledger: entry not found")
)

Functions

func AfterValue

func AfterValue[I comparable](o ReadOptions) (I, bool)

AfterValue returns the cursor value as the requested type. Returns the zero value and false if no cursor is set or the type doesn't match. This function is intended for Store implementors.

func TxFromContext added in v0.0.2

func TxFromContext(ctx context.Context) any

TxFromContext returns the transaction stored in the context, or nil. This function is intended for Store implementors.

func ValidateName

func ValidateName(name string) error

ValidateName checks that name is a safe identifier for use as a table or collection name.

func WithTx added in v0.0.2

func WithTx(ctx context.Context, tx any) context.Context

WithTx returns a new context carrying the given transaction value. SQL backends accept *sql.Tx, MongoDB backends accept mongo.Session.

When a context with a transaction is passed to Store methods, the store participates in the caller's transaction instead of creating its own. This enables atomic writes across the ledger and other tables/collections.

Example (SQL):

tx, _ := db.BeginTx(ctx, nil)
ctx = ledger.WithTx(ctx, tx)
store.Append(ctx, "orders", entry)  // uses caller's tx
tx.Commit()

Example (MongoDB):

session, _ := client.StartSession()
session.WithTransaction(ctx, func(sc context.Context) (any, error) {
    ctx := ledger.WithTx(sc, session)
    store.Append(ctx, "orders", entry)  // uses caller's session
    return nil, nil
})

Types

type AppendInput

type AppendInput[T any] struct {
	Payload  T                 // Payload to encode and store.
	OrderKey string            // Ordering key for filtering (e.g., aggregate ID).
	DedupKey string            // Deduplication key. Empty means no dedup.
	Metadata map[string]string // Immutable key-value metadata.
	Tags     []string          // Initial tags (can be updated later via SetTags).
}

AppendInput describes an entry to append to a stream.

type Entry

type Entry[I comparable, T any] struct {
	ID            I                 // Store-assigned unique ID.
	Stream        string            // Stream name this entry belongs to.
	Payload       T                 // Decoded payload.
	OrderKey      string            // Ordering key.
	DedupKey      string            // Deduplication key.
	SchemaVersion int               // Schema version at write time (before upcasting).
	Metadata      map[string]string // Immutable key-value metadata (set at append).
	Tags          []string          // Mutable tags (updated via SetTags).
	Annotations   map[string]string // Mutable annotations (updated via SetAnnotations).
	CreatedAt     time.Time         // Timestamp when the entry was stored.
	UpdatedAt     *time.Time        // Timestamp of last tag/annotation update.
}

Entry is a typed entry read back from a stream.

type FieldMapper

type FieldMapper struct {
	// contains filtered or unexported fields
}

FieldMapper transforms JSON payloads between schema versions using field operations. It implements Upcasterjson.RawMessage and is intended for use with SQL backends (sqlite, postgres). MongoDB users should write BSON-aware upcasters instead.

Supports three operations applied in order: renames, then defaults, then removals.

upcaster := ledger.NewFieldMapper(1, 2).
    RenameField("customer_name", "customerName").
    AddDefault("email", "unknown@example.com").
    RemoveField("legacy_id")

func NewFieldMapper

func NewFieldMapper(from, to int) *FieldMapper

NewFieldMapper creates a new field mapper for the specified version transition.

func (*FieldMapper) AddDefault

func (f *FieldMapper) AddDefault(field string, value any) *FieldMapper

AddDefault sets a default value for a field that may not exist.

func (*FieldMapper) FromVersion

func (f *FieldMapper) FromVersion() int

FromVersion returns the source version this mapper handles.

func (*FieldMapper) RemoveField

func (f *FieldMapper) RemoveField(field string) *FieldMapper

RemoveField marks a field for removal during upcasting.

func (*FieldMapper) RenameField

func (f *FieldMapper) RenameField(oldName, newName string) *FieldMapper

RenameField adds a field rename transformation.

func (*FieldMapper) ToVersion

func (f *FieldMapper) ToVersion() int

ToVersion returns the target version this mapper produces.

func (*FieldMapper) Upcast

func (f *FieldMapper) Upcast(ctx context.Context, data json.RawMessage) (json.RawMessage, error)

Upcast transforms the JSON payload from source to target version. Applies operations in order: renames, defaults, removals.

type HealthChecker

type HealthChecker interface {
	Health(ctx context.Context) error
}

HealthChecker is an optional interface that Store implementations may provide to report backend health (e.g., database connectivity).

type JSONCodec

type JSONCodec[T any] struct{}

JSONCodec implements PayloadCodec[T, json.RawMessage] using encoding/json. It is the default codec for the sqlite and postgres backends.

func (JSONCodec[T]) Marshal added in v0.0.3

func (JSONCodec[T]) Marshal(v T) (json.RawMessage, error)

Marshal encodes v to JSON.

func (JSONCodec[T]) Unmarshal added in v0.0.3

func (JSONCodec[T]) Unmarshal(p json.RawMessage, v *T) error

Unmarshal decodes JSON into v.

type ListOption added in v0.0.2

type ListOption func(*ListOptions)

ListOption configures how stream IDs are listed by [Store.ListStreamIDs].

func ListAfter added in v0.0.2

func ListAfter(streamID string) ListOption

ListAfter returns a ListOption that sets the cursor position. Only stream IDs strictly greater than this value are returned.

func ListLimit added in v0.0.2

func ListLimit(n int) ListOption

ListLimit returns a ListOption that sets the maximum number of stream IDs to return. The default limit is 100.

type ListOptions added in v0.0.2

type ListOptions struct {
	// contains filtered or unexported fields
}

ListOptions holds the parsed list parameters. This type is intended for Store implementors.

func ApplyListOptions added in v0.0.2

func ApplyListOptions(opts ...ListOption) ListOptions

ApplyListOptions applies the given options and returns the resolved ListOptions. This function is intended for Store implementors.

func (ListOptions) After added in v0.0.2

func (o ListOptions) After() string

After returns the cursor value. Only stream IDs strictly greater than this value are returned. Empty string means no cursor.

func (ListOptions) HasAfter added in v0.0.2

func (o ListOptions) HasAfter() bool

HasAfter reports whether a cursor was set.

func (ListOptions) Limit added in v0.0.2

func (o ListOptions) Limit() int

Limit returns the maximum number of stream IDs to return.

type Option

type Option[P any] func(*options[P])

Option configures a Stream. P is the store-native payload type.

func WithSchemaVersion

func WithSchemaVersion[P any](v int) Option[P]

WithSchemaVersion sets the schema version stamped on new entries. Defaults to 1. When reading entries with older versions, registered upcasters are applied to transform the payload before decoding.

func WithUpcaster

func WithUpcaster[P any](u Upcaster[P]) Option[P]

WithUpcaster registers an upcaster for transforming payloads from one schema version to the next. Register in sequence (v1→v2, v2→v3).

type Order

type Order int

Order specifies the sort direction when reading entries.

const (
	// Ascending reads entries oldest first (default).
	Ascending Order = iota
	// Descending reads entries newest first.
	Descending
)

func (Order) String

func (o Order) String() string

String returns the string representation of the order direction.

type PayloadCodec added in v0.0.3

type PayloadCodec[T, P any] interface {
	Marshal(v T) (P, error)
	Unmarshal(p P, v *T) error
}

PayloadCodec marshals and unmarshals between a domain type T and the store-native payload type P (e.g. encoding/json.RawMessage for SQL backends, go.mongodb.org/mongo-driver/v2/bson.Raw for MongoDB).

Each store backend declares its own P; the codec bridges the user's T to it.

type RawEntry

type RawEntry[P any] struct {
	Payload       P                 // Encoded payload in the store's native format.
	OrderKey      string            // Ordering key for filtering (e.g., aggregate ID).
	DedupKey      string            // Deduplication key. Empty means no dedup.
	SchemaVersion int               // Schema version of the payload.
	Metadata      map[string]string // Arbitrary immutable key-value metadata.
	Tags          []string          // Initial tags (mutable after append via SetTags).
}

RawEntry is an entry with an already-encoded payload, ready for storage. P is the store-native payload type.

type ReadOption

type ReadOption func(*ReadOptions)

ReadOption configures how entries are read from a stream.

func After

func After[I comparable](id I) ReadOption

After returns a ReadOption that sets the cursor position. Only entries after this ID are returned.

func Desc

func Desc() ReadOption

Desc returns a ReadOption that reads entries in descending order (newest first).

func Limit

func Limit(n int) ReadOption

Limit returns a ReadOption that sets the maximum number of entries to return. The default limit is 100.

func WithAllTags added in v0.0.2

func WithAllTags(tags ...string) ReadOption

WithAllTags returns a ReadOption that filters entries having ALL specified tags.

func WithOrderKey

func WithOrderKey(key string) ReadOption

WithOrderKey returns a ReadOption that filters entries by ordering key.

func WithTag added in v0.0.2

func WithTag(tag string) ReadOption

WithTag returns a ReadOption that filters entries having a specific tag.

type ReadOptions

type ReadOptions struct {
	// contains filtered or unexported fields
}

ReadOptions holds the parsed read parameters. This type is intended for Store implementors.

func ApplyReadOptions

func ApplyReadOptions(opts ...ReadOption) ReadOptions

ApplyReadOptions applies the given options and returns the resolved ReadOptions. This function is intended for Store implementors.

func (ReadOptions) AllTags added in v0.0.2

func (o ReadOptions) AllTags() []string

AllTags returns the all-tags filter, or nil if not set.

func (ReadOptions) HasAfter

func (o ReadOptions) HasAfter() bool

HasAfter reports whether a cursor was set.

func (ReadOptions) Limit

func (o ReadOptions) Limit() int

Limit returns the maximum number of entries to return.

func (ReadOptions) Order

func (o ReadOptions) Order() Order

Order returns the sort direction.

func (ReadOptions) OrderKeyFilter

func (o ReadOptions) OrderKeyFilter() string

OrderKeyFilter returns the order key filter, or empty string if not set.

func (ReadOptions) Tag added in v0.0.2

func (o ReadOptions) Tag() string

Tag returns the single-tag filter, or empty string if not set.

type Store

type Store[I comparable, P any] interface {
	// Append adds entries to the named stream. Returns the IDs of newly appended entries.
	// Entries with a non-empty DedupKey that already exists in the stream are silently skipped.
	Append(ctx context.Context, stream string, entries ...RawEntry[P]) ([]I, error)

	// Read returns entries from the named stream, ordered by ID.
	// Reading a non-existent stream returns nil, nil.
	Read(ctx context.Context, stream string, opts ...ReadOption) ([]StoredEntry[I, P], error)

	// Count returns the number of entries in the named stream.
	Count(ctx context.Context, stream string) (int64, error)

	// SetTags replaces all tags on an entry. Tags are mutable labels for
	// categorization and filtering (e.g., "processed", "archived").
	SetTags(ctx context.Context, stream string, id I, tags []string) error

	// SetAnnotations merges annotations into an entry. Existing keys are
	// overwritten, new keys are added, and keys with nil values are deleted.
	// Annotations are mutable key-value metadata (e.g., "processed_at", "error").
	SetAnnotations(ctx context.Context, stream string, id I, annotations map[string]*string) error

	// Trim deletes entries from the named stream with IDs less than or equal to beforeID.
	// Returns the number of entries deleted.
	Trim(ctx context.Context, stream string, beforeID I) (int64, error)

	// ListStreamIDs returns distinct stream IDs that have at least one entry in
	// this store. Results are ordered ascending by stream ID and cursor-paginated
	// via [ListAfter] and [ListLimit]. Returns nil, nil for an empty store.
	//
	// A stream that has been fully trimmed is not returned (no separate stream
	// registry is maintained).
	ListStreamIDs(ctx context.Context, opts ...ListOption) ([]string, error)

	// Close releases resources held by the store. The caller is responsible for
	// closing the underlying database connection.
	Close(ctx context.Context) error
}

Store is the backend storage interface for append-only log entries. I is the ID type used for cursor-based iteration (e.g., int64 for SQL, string for MongoDB). P is the store-native payload type (e.g., json.RawMessage for SQL, bson.Raw for MongoDB).

A Store represents a single entity type — one table or collection. The stream parameter on each method identifies an instance within that type (e.g., a store created with table "orders" may contain streams "user-123", "user-456").

Append semantics: SQL backends (sqlite, postgres) use transactions for atomic batch inserts. The MongoDB backend uses InsertMany with ordered:false, meaning partial success is possible on non-dedup errors.

Transaction support: pass a *sql.Tx or *mongo.Session via WithTx to have operations participate in an external transaction.

Example (ListStreamIDs)
package main

import (
	"context"
	"encoding/json"
	"fmt"

	"github.com/rbaliyan/ledger"
)

// mockStore is a minimal store for examples.
type mockStore struct{}

func (mockStore) Append(_ context.Context, _ string, entries ...ledger.RawEntry[json.RawMessage]) ([]int64, error) {
	ids := make([]int64, len(entries))
	for i := range entries {
		ids[i] = int64(i + 1)
	}
	return ids, nil
}

func (mockStore) Read(_ context.Context, _ string, _ ...ledger.ReadOption) ([]ledger.StoredEntry[int64, json.RawMessage], error) {
	return nil, nil
}

func (mockStore) Count(_ context.Context, _ string) (int64, error) { return 0, nil }

func (mockStore) SetTags(_ context.Context, _ string, _ int64, _ []string) error { return nil }

func (mockStore) SetAnnotations(_ context.Context, _ string, _ int64, _ map[string]*string) error {
	return nil
}

func (mockStore) Trim(_ context.Context, _ string, _ int64) (int64, error) { return 0, nil }

func (mockStore) ListStreamIDs(_ context.Context, _ ...ledger.ListOption) ([]string, error) {
	return nil, nil
}

func (mockStore) Close(_ context.Context) error { return nil }

func main() {
	// A Store represents one entity type. Open one per type.
	//
	// For this example, mockStore.ListStreamIDs returns nil, but with a real
	// backend it would return every stream ID with at least one entry, letting
	// you enumerate all instances of the type.
	orders := mockStore{}

	ids, _ := orders.ListStreamIDs(context.Background())
	fmt.Println("stream count:", len(ids))
}
Output:
stream count: 0

type StoredEntry

type StoredEntry[I comparable, P any] struct {
	ID            I                 // Store-assigned unique ID.
	Stream        string            // Stream name this entry belongs to.
	Payload       P                 // Encoded payload in the store's native format.
	OrderKey      string            // Ordering key.
	DedupKey      string            // Deduplication key.
	SchemaVersion int               // Schema version at write time.
	Metadata      map[string]string // Immutable key-value metadata (set at append).
	Tags          []string          // Mutable tags (updated via SetTags).
	Annotations   map[string]string // Mutable annotations (updated via SetAnnotations).
	CreatedAt     time.Time         // Timestamp when the entry was stored.
	UpdatedAt     *time.Time        // Timestamp of last tag/annotation update, nil if never updated.
}

StoredEntry is a raw entry read back from the store, including its assigned ID and timestamp. I is the store ID type; P is the store-native payload type.

type Stream

type Stream[I comparable, P any, T any] struct {
	// contains filtered or unexported fields
}

Stream is a lightweight, typed handle to a stream instance within a store.

  • I is the store ID type (e.g. int64 for SQL, string for MongoDB).
  • P is the store-native payload type (e.g. json.RawMessage, bson.Raw).
  • T is the user's domain payload type.

The codec bridges T and P on every append and read. The Store represents the entity type (table/collection); the Stream's id identifies the particular instance within that type.

It is cheap to create — create one per operation and discard it. Stream is safe for concurrent use.

func NewStream

func NewStream[I comparable, P any, T any](
	store Store[I, P],
	id string,
	codec PayloadCodec[T, P],
	opts ...Option[P],
) Stream[I, P, T]

NewStream creates a lightweight stream handle. The stream does not need to exist in the store beforehand — it is created implicitly on first append. The id identifies the stream instance within the store's type.

codec is required and must not be nil. For SQL backends use JSONCodec; for MongoDB use the BSONCodec provided by the mongodb package.

Panics if store is nil.

Example
package main

import (
	"context"
	"encoding/json"
	"fmt"

	"github.com/rbaliyan/ledger"
)

// mockStore is a minimal store for examples.
type mockStore struct{}

func (mockStore) Append(_ context.Context, _ string, entries ...ledger.RawEntry[json.RawMessage]) ([]int64, error) {
	ids := make([]int64, len(entries))
	for i := range entries {
		ids[i] = int64(i + 1)
	}
	return ids, nil
}

func (mockStore) Read(_ context.Context, _ string, _ ...ledger.ReadOption) ([]ledger.StoredEntry[int64, json.RawMessage], error) {
	return nil, nil
}

func (mockStore) Count(_ context.Context, _ string) (int64, error) { return 0, nil }

func (mockStore) SetTags(_ context.Context, _ string, _ int64, _ []string) error { return nil }

func (mockStore) SetAnnotations(_ context.Context, _ string, _ int64, _ map[string]*string) error {
	return nil
}

func (mockStore) Trim(_ context.Context, _ string, _ int64) (int64, error) { return 0, nil }

func (mockStore) ListStreamIDs(_ context.Context, _ ...ledger.ListOption) ([]string, error) {
	return nil, nil
}

func (mockStore) Close(_ context.Context) error { return nil }

func main() {
	store := mockStore{}

	type Order struct {
		ID     string  `json:"id"`
		Amount float64 `json:"amount"`
	}

	// The store represents the "orders" type (one table/collection).
	// The stream is one instance within that type, identified by a stream ID.
	s := ledger.NewStream(store, "user-123", ledger.JSONCodec[Order]{})

	ids, err := s.Append(context.Background(), ledger.AppendInput[Order]{
		Payload:  Order{ID: "o-1", Amount: 99.99},
		OrderKey: "customer-123",
		DedupKey: "evt-abc",
	})
	if err != nil {
		panic(err)
	}
	fmt.Println("appended:", len(ids), "entries")
}
Output:
appended: 1 entries
Example (SchemaVersioning)
package main

import (
	"context"
	"encoding/json"
	"fmt"

	"github.com/rbaliyan/ledger"
)

// mockStore is a minimal store for examples.
type mockStore struct{}

func (mockStore) Append(_ context.Context, _ string, entries ...ledger.RawEntry[json.RawMessage]) ([]int64, error) {
	ids := make([]int64, len(entries))
	for i := range entries {
		ids[i] = int64(i + 1)
	}
	return ids, nil
}

func (mockStore) Read(_ context.Context, _ string, _ ...ledger.ReadOption) ([]ledger.StoredEntry[int64, json.RawMessage], error) {
	return nil, nil
}

func (mockStore) Count(_ context.Context, _ string) (int64, error) { return 0, nil }

func (mockStore) SetTags(_ context.Context, _ string, _ int64, _ []string) error { return nil }

func (mockStore) SetAnnotations(_ context.Context, _ string, _ int64, _ map[string]*string) error {
	return nil
}

func (mockStore) Trim(_ context.Context, _ string, _ int64) (int64, error) { return 0, nil }

func (mockStore) ListStreamIDs(_ context.Context, _ ...ledger.ListOption) ([]string, error) {
	return nil, nil
}

func (mockStore) Close(_ context.Context) error { return nil }

func main() {
	store := mockStore{}

	type OrderV2 struct {
		Name   string  `json:"name"`
		Email  string  `json:"email"`
		Amount float64 `json:"amount"`
	}

	// Create a v2 stream with upcaster from v1
	s := ledger.NewStream(store, "user-123", ledger.JSONCodec[OrderV2]{},
		ledger.WithSchemaVersion[json.RawMessage](2),
		ledger.WithUpcaster(ledger.NewFieldMapper(1, 2).
			RenameField("customer_name", "name").
			AddDefault("email", "unknown@example.com")),
	)

	fmt.Println("stream:", s.ID(), "schema version:", s.SchemaVersion())
}
Output:
stream: user-123 schema version: 2

func (Stream[I, P, T]) Append

func (s Stream[I, P, T]) Append(ctx context.Context, entries ...AppendInput[T]) ([]I, error)

Append encodes and appends entries to the stream. Returns IDs of newly appended entries. Each entry is stamped with the stream's current schema version. Entries with duplicate dedup keys are silently skipped.

func (Stream[I, P, T]) ID added in v0.0.2

func (s Stream[I, P, T]) ID() string

ID returns the stream instance ID within the store's type.

func (Stream[I, P, T]) Read

func (s Stream[I, P, T]) Read(ctx context.Context, opts ...ReadOption) ([]Entry[I, T], error)

Read returns decoded entries from the stream. Entries written with an older schema version are automatically upcasted to the current version before decoding into T.

func (Stream[I, P, T]) SchemaVersion

func (s Stream[I, P, T]) SchemaVersion() int

SchemaVersion returns the current schema version used for new entries.

func (Stream[I, P, T]) SetAnnotations added in v0.0.2

func (s Stream[I, P, T]) SetAnnotations(ctx context.Context, id I, annotations map[string]*string) error

SetAnnotations merges annotations into an entry in this stream.

func (Stream[I, P, T]) SetTags added in v0.0.2

func (s Stream[I, P, T]) SetTags(ctx context.Context, id I, tags []string) error

SetTags replaces all tags on an entry in this stream.

type Upcaster

type Upcaster[P any] interface {
	// FromVersion returns the source version this upcaster handles.
	FromVersion() int

	// ToVersion returns the target version this upcaster produces.
	ToVersion() int

	// Upcast transforms the payload from source to target version.
	Upcast(ctx context.Context, payload P) (P, error)
}

Upcaster transforms payload data from one schema version to the next. P is the store-native payload type (e.g. json.RawMessage for SQL backends, bson.Raw for MongoDB).

Register upcasters in sequence (v1→v2, v2→v3, etc.) to allow chained upgrades. When reading entries written with an older schema version, the stream applies upcasters in order to transform the payload before decoding into T.

func UpcasterFunc

func UpcasterFunc[P any](from, to int, fn func(ctx context.Context, payload P) (P, error)) Upcaster[P]

UpcasterFunc creates an Upcaster[P] from a plain function.

Directories

Path Synopsis
api
Package ledgerpb exposes the ledger.Store API as a gRPC service with pluggable authentication and authorisation via SecurityGuard.
Package ledgerpb exposes the ledger.Store API as a gRPC service with pluggable authentication and authorisation via SecurityGuard.
Package mongodb provides a MongoDB-backed ledger store.
Package mongodb provides a MongoDB-backed ledger store.
Package postgres provides a PostgreSQL-backed ledger store.
Package postgres provides a PostgreSQL-backed ledger store.
Package sqlite provides a SQLite-backed ledger store.
Package sqlite provides a SQLite-backed ledger store.
Package storetest provides a backend-agnostic conformance test suite for ledger.Store implementations.
Package storetest provides a backend-agnostic conformance test suite for ledger.Store implementations.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL