Documentation
¶
Overview ¶
Package ledger provides an append-only log with typed generic entries, schema versioning, deduplication, and pluggable storage backends.
The core abstraction is a two-level generic design:
- Store is the backend interface, generic over the ID type I and the store-native payload type P (e.g. json.RawMessage for SQL, bson.Raw for MongoDB).
- Stream is a lightweight typed handle, generic over I, P, and the user's domain payload type T. A PayloadCodec bridges T and P.
Streams are cheap to create — one per operation, then discard. Schema versioning with Upcaster chains enables safe payload evolution without downtime.
Each entry has immutable fields (Payload, OrderKey, DedupKey, SchemaVersion, Metadata, CreatedAt) set at append time, and mutable fields (Tags, Annotations) that can be updated after the entry is written.
Backends: sqlite (Store[int64, json.RawMessage]), postgres (Store[int64, json.RawMessage]), mongodb (Store[string, bson.Raw]).
Index ¶
- Variables
- func AfterValue[I comparable](o ReadOptions) (I, bool)
- func TxFromContext(ctx context.Context) any
- func ValidateName(name string) error
- func WithTx(ctx context.Context, tx any) context.Context
- type AppendInput
- type Entry
- type FieldMapper
- func (f *FieldMapper) AddDefault(field string, value any) *FieldMapper
- func (f *FieldMapper) FromVersion() int
- func (f *FieldMapper) RemoveField(field string) *FieldMapper
- func (f *FieldMapper) RenameField(oldName, newName string) *FieldMapper
- func (f *FieldMapper) ToVersion() int
- func (f *FieldMapper) Upcast(ctx context.Context, data json.RawMessage) (json.RawMessage, error)
- type HealthChecker
- type JSONCodec
- type ListOption
- type ListOptions
- type Option
- type Order
- type PayloadCodec
- type RawEntry
- type ReadOption
- type ReadOptions
- type Store
- type StoredEntry
- type Stream
- func (s Stream[I, P, T]) Append(ctx context.Context, entries ...AppendInput[T]) ([]I, error)
- func (s Stream[I, P, T]) ID() string
- func (s Stream[I, P, T]) Read(ctx context.Context, opts ...ReadOption) ([]Entry[I, T], error)
- func (s Stream[I, P, T]) SchemaVersion() int
- func (s Stream[I, P, T]) SetAnnotations(ctx context.Context, id I, annotations map[string]*string) error
- func (s Stream[I, P, T]) SetTags(ctx context.Context, id I, tags []string) error
- type Upcaster
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ( // ErrStoreClosed is returned when operating on a closed store. ErrStoreClosed = errors.New("ledger: store closed") // ErrEncode is returned when payload encoding fails. ErrEncode = errors.New("ledger: encode failed") // ErrDecode is returned when payload decoding fails. ErrDecode = errors.New("ledger: decode failed") // ErrNoUpcaster is returned when no upcaster is available for a version transition. ErrNoUpcaster = errors.New("ledger: no upcaster available") // ErrInvalidCursor is returned when a cursor value has an unexpected type. ErrInvalidCursor = errors.New("ledger: invalid cursor type") // ErrInvalidName is returned when a table or collection name is invalid. ErrInvalidName = errors.New("ledger: invalid table/collection name") // ErrEntryNotFound is returned when SetTags or SetAnnotations targets a non-existent entry. ErrEntryNotFound = errors.New("ledger: entry not found") )
Functions ¶
func AfterValue ¶
func AfterValue[I comparable](o ReadOptions) (I, bool)
AfterValue returns the cursor value as the requested type. Returns the zero value and false if no cursor is set or the type doesn't match. This function is intended for Store implementors.
func TxFromContext ¶ added in v0.0.2
TxFromContext returns the transaction stored in the context, or nil. This function is intended for Store implementors.
func ValidateName ¶
ValidateName checks that name is a safe identifier for use as a table or collection name.
func WithTx ¶ added in v0.0.2
WithTx returns a new context carrying the given transaction value. SQL backends accept *sql.Tx, MongoDB backends accept mongo.Session.
When a context with a transaction is passed to Store methods, the store participates in the caller's transaction instead of creating its own. This enables atomic writes across the ledger and other tables/collections.
Example (SQL):
tx, _ := db.BeginTx(ctx, nil) ctx = ledger.WithTx(ctx, tx) store.Append(ctx, "orders", entry) // uses caller's tx tx.Commit()
Example (MongoDB):
session, _ := client.StartSession()
session.WithTransaction(ctx, func(sc context.Context) (any, error) {
ctx := ledger.WithTx(sc, session)
store.Append(ctx, "orders", entry) // uses caller's session
return nil, nil
})
Types ¶
type AppendInput ¶
type AppendInput[T any] struct { Payload T // Payload to encode and store. OrderKey string // Ordering key for filtering (e.g., aggregate ID). DedupKey string // Deduplication key. Empty means no dedup. Metadata map[string]string // Immutable key-value metadata. Tags []string // Initial tags (can be updated later via SetTags). }
AppendInput describes an entry to append to a stream.
type Entry ¶
type Entry[I comparable, T any] struct { ID I // Store-assigned unique ID. Stream string // Stream name this entry belongs to. Payload T // Decoded payload. OrderKey string // Ordering key. DedupKey string // Deduplication key. SchemaVersion int // Schema version at write time (before upcasting). Metadata map[string]string // Immutable key-value metadata (set at append). Tags []string // Mutable tags (updated via SetTags). Annotations map[string]string // Mutable annotations (updated via SetAnnotations). CreatedAt time.Time // Timestamp when the entry was stored. UpdatedAt *time.Time // Timestamp of last tag/annotation update. }
Entry is a typed entry read back from a stream.
type FieldMapper ¶
type FieldMapper struct {
// contains filtered or unexported fields
}
FieldMapper transforms JSON payloads between schema versions using field operations. It implements Upcasterjson.RawMessage and is intended for use with SQL backends (sqlite, postgres). MongoDB users should write BSON-aware upcasters instead.
Supports three operations applied in order: renames, then defaults, then removals.
upcaster := ledger.NewFieldMapper(1, 2).
RenameField("customer_name", "customerName").
AddDefault("email", "unknown@example.com").
RemoveField("legacy_id")
func NewFieldMapper ¶
func NewFieldMapper(from, to int) *FieldMapper
NewFieldMapper creates a new field mapper for the specified version transition.
func (*FieldMapper) AddDefault ¶
func (f *FieldMapper) AddDefault(field string, value any) *FieldMapper
AddDefault sets a default value for a field that may not exist.
func (*FieldMapper) FromVersion ¶
func (f *FieldMapper) FromVersion() int
FromVersion returns the source version this mapper handles.
func (*FieldMapper) RemoveField ¶
func (f *FieldMapper) RemoveField(field string) *FieldMapper
RemoveField marks a field for removal during upcasting.
func (*FieldMapper) RenameField ¶
func (f *FieldMapper) RenameField(oldName, newName string) *FieldMapper
RenameField adds a field rename transformation.
func (*FieldMapper) ToVersion ¶
func (f *FieldMapper) ToVersion() int
ToVersion returns the target version this mapper produces.
func (*FieldMapper) Upcast ¶
func (f *FieldMapper) Upcast(ctx context.Context, data json.RawMessage) (json.RawMessage, error)
Upcast transforms the JSON payload from source to target version. Applies operations in order: renames, defaults, removals.
type HealthChecker ¶
HealthChecker is an optional interface that Store implementations may provide to report backend health (e.g., database connectivity).
type JSONCodec ¶
type JSONCodec[T any] struct{}
JSONCodec implements PayloadCodec[T, json.RawMessage] using encoding/json. It is the default codec for the sqlite and postgres backends.
type ListOption ¶ added in v0.0.2
type ListOption func(*ListOptions)
ListOption configures how stream IDs are listed by [Store.ListStreamIDs].
func ListAfter ¶ added in v0.0.2
func ListAfter(streamID string) ListOption
ListAfter returns a ListOption that sets the cursor position. Only stream IDs strictly greater than this value are returned.
func ListLimit ¶ added in v0.0.2
func ListLimit(n int) ListOption
ListLimit returns a ListOption that sets the maximum number of stream IDs to return. The default limit is 100.
type ListOptions ¶ added in v0.0.2
type ListOptions struct {
// contains filtered or unexported fields
}
ListOptions holds the parsed list parameters. This type is intended for Store implementors.
func ApplyListOptions ¶ added in v0.0.2
func ApplyListOptions(opts ...ListOption) ListOptions
ApplyListOptions applies the given options and returns the resolved ListOptions. This function is intended for Store implementors.
func (ListOptions) After ¶ added in v0.0.2
func (o ListOptions) After() string
After returns the cursor value. Only stream IDs strictly greater than this value are returned. Empty string means no cursor.
func (ListOptions) HasAfter ¶ added in v0.0.2
func (o ListOptions) HasAfter() bool
HasAfter reports whether a cursor was set.
func (ListOptions) Limit ¶ added in v0.0.2
func (o ListOptions) Limit() int
Limit returns the maximum number of stream IDs to return.
type Option ¶
type Option[P any] func(*options[P])
Option configures a Stream. P is the store-native payload type.
func WithSchemaVersion ¶
WithSchemaVersion sets the schema version stamped on new entries. Defaults to 1. When reading entries with older versions, registered upcasters are applied to transform the payload before decoding.
func WithUpcaster ¶
WithUpcaster registers an upcaster for transforming payloads from one schema version to the next. Register in sequence (v1→v2, v2→v3).
type PayloadCodec ¶ added in v0.0.3
PayloadCodec marshals and unmarshals between a domain type T and the store-native payload type P (e.g. encoding/json.RawMessage for SQL backends, go.mongodb.org/mongo-driver/v2/bson.Raw for MongoDB).
Each store backend declares its own P; the codec bridges the user's T to it.
type RawEntry ¶
type RawEntry[P any] struct { Payload P // Encoded payload in the store's native format. OrderKey string // Ordering key for filtering (e.g., aggregate ID). DedupKey string // Deduplication key. Empty means no dedup. SchemaVersion int // Schema version of the payload. Metadata map[string]string // Arbitrary immutable key-value metadata. Tags []string // Initial tags (mutable after append via SetTags). }
RawEntry is an entry with an already-encoded payload, ready for storage. P is the store-native payload type.
type ReadOption ¶
type ReadOption func(*ReadOptions)
ReadOption configures how entries are read from a stream.
func After ¶
func After[I comparable](id I) ReadOption
After returns a ReadOption that sets the cursor position. Only entries after this ID are returned.
func Desc ¶
func Desc() ReadOption
Desc returns a ReadOption that reads entries in descending order (newest first).
func Limit ¶
func Limit(n int) ReadOption
Limit returns a ReadOption that sets the maximum number of entries to return. The default limit is 100.
func WithAllTags ¶ added in v0.0.2
func WithAllTags(tags ...string) ReadOption
WithAllTags returns a ReadOption that filters entries having ALL specified tags.
func WithOrderKey ¶
func WithOrderKey(key string) ReadOption
WithOrderKey returns a ReadOption that filters entries by ordering key.
func WithTag ¶ added in v0.0.2
func WithTag(tag string) ReadOption
WithTag returns a ReadOption that filters entries having a specific tag.
type ReadOptions ¶
type ReadOptions struct {
// contains filtered or unexported fields
}
ReadOptions holds the parsed read parameters. This type is intended for Store implementors.
func ApplyReadOptions ¶
func ApplyReadOptions(opts ...ReadOption) ReadOptions
ApplyReadOptions applies the given options and returns the resolved ReadOptions. This function is intended for Store implementors.
func (ReadOptions) AllTags ¶ added in v0.0.2
func (o ReadOptions) AllTags() []string
AllTags returns the all-tags filter, or nil if not set.
func (ReadOptions) HasAfter ¶
func (o ReadOptions) HasAfter() bool
HasAfter reports whether a cursor was set.
func (ReadOptions) Limit ¶
func (o ReadOptions) Limit() int
Limit returns the maximum number of entries to return.
func (ReadOptions) OrderKeyFilter ¶
func (o ReadOptions) OrderKeyFilter() string
OrderKeyFilter returns the order key filter, or empty string if not set.
func (ReadOptions) Tag ¶ added in v0.0.2
func (o ReadOptions) Tag() string
Tag returns the single-tag filter, or empty string if not set.
type Store ¶
type Store[I comparable, P any] interface { // Append adds entries to the named stream. Returns the IDs of newly appended entries. // Entries with a non-empty DedupKey that already exists in the stream are silently skipped. Append(ctx context.Context, stream string, entries ...RawEntry[P]) ([]I, error) // Read returns entries from the named stream, ordered by ID. // Reading a non-existent stream returns nil, nil. Read(ctx context.Context, stream string, opts ...ReadOption) ([]StoredEntry[I, P], error) // Count returns the number of entries in the named stream. Count(ctx context.Context, stream string) (int64, error) // SetTags replaces all tags on an entry. Tags are mutable labels for // categorization and filtering (e.g., "processed", "archived"). SetTags(ctx context.Context, stream string, id I, tags []string) error // SetAnnotations merges annotations into an entry. Existing keys are // overwritten, new keys are added, and keys with nil values are deleted. // Annotations are mutable key-value metadata (e.g., "processed_at", "error"). SetAnnotations(ctx context.Context, stream string, id I, annotations map[string]*string) error // Trim deletes entries from the named stream with IDs less than or equal to beforeID. // Returns the number of entries deleted. Trim(ctx context.Context, stream string, beforeID I) (int64, error) // ListStreamIDs returns distinct stream IDs that have at least one entry in // this store. Results are ordered ascending by stream ID and cursor-paginated // via [ListAfter] and [ListLimit]. Returns nil, nil for an empty store. // // A stream that has been fully trimmed is not returned (no separate stream // registry is maintained). ListStreamIDs(ctx context.Context, opts ...ListOption) ([]string, error) // Close releases resources held by the store. The caller is responsible for // closing the underlying database connection. Close(ctx context.Context) error }
Store is the backend storage interface for append-only log entries. I is the ID type used for cursor-based iteration (e.g., int64 for SQL, string for MongoDB). P is the store-native payload type (e.g., json.RawMessage for SQL, bson.Raw for MongoDB).
A Store represents a single entity type — one table or collection. The stream parameter on each method identifies an instance within that type (e.g., a store created with table "orders" may contain streams "user-123", "user-456").
Append semantics: SQL backends (sqlite, postgres) use transactions for atomic batch inserts. The MongoDB backend uses InsertMany with ordered:false, meaning partial success is possible on non-dedup errors.
Transaction support: pass a *sql.Tx or *mongo.Session via WithTx to have operations participate in an external transaction.
Example (ListStreamIDs) ¶
package main
import (
"context"
"encoding/json"
"fmt"
"github.com/rbaliyan/ledger"
)
// mockStore is a minimal store for examples.
type mockStore struct{}
func (mockStore) Append(_ context.Context, _ string, entries ...ledger.RawEntry[json.RawMessage]) ([]int64, error) {
ids := make([]int64, len(entries))
for i := range entries {
ids[i] = int64(i + 1)
}
return ids, nil
}
func (mockStore) Read(_ context.Context, _ string, _ ...ledger.ReadOption) ([]ledger.StoredEntry[int64, json.RawMessage], error) {
return nil, nil
}
func (mockStore) Count(_ context.Context, _ string) (int64, error) { return 0, nil }
func (mockStore) SetTags(_ context.Context, _ string, _ int64, _ []string) error { return nil }
func (mockStore) SetAnnotations(_ context.Context, _ string, _ int64, _ map[string]*string) error {
return nil
}
func (mockStore) Trim(_ context.Context, _ string, _ int64) (int64, error) { return 0, nil }
func (mockStore) ListStreamIDs(_ context.Context, _ ...ledger.ListOption) ([]string, error) {
return nil, nil
}
func (mockStore) Close(_ context.Context) error { return nil }
func main() {
// A Store represents one entity type. Open one per type.
//
// For this example, mockStore.ListStreamIDs returns nil, but with a real
// backend it would return every stream ID with at least one entry, letting
// you enumerate all instances of the type.
orders := mockStore{}
ids, _ := orders.ListStreamIDs(context.Background())
fmt.Println("stream count:", len(ids))
}
Output: stream count: 0
type StoredEntry ¶
type StoredEntry[I comparable, P any] struct { ID I // Store-assigned unique ID. Stream string // Stream name this entry belongs to. Payload P // Encoded payload in the store's native format. OrderKey string // Ordering key. DedupKey string // Deduplication key. SchemaVersion int // Schema version at write time. Metadata map[string]string // Immutable key-value metadata (set at append). Tags []string // Mutable tags (updated via SetTags). Annotations map[string]string // Mutable annotations (updated via SetAnnotations). CreatedAt time.Time // Timestamp when the entry was stored. UpdatedAt *time.Time // Timestamp of last tag/annotation update, nil if never updated. }
StoredEntry is a raw entry read back from the store, including its assigned ID and timestamp. I is the store ID type; P is the store-native payload type.
type Stream ¶
type Stream[I comparable, P any, T any] struct { // contains filtered or unexported fields }
Stream is a lightweight, typed handle to a stream instance within a store.
- I is the store ID type (e.g. int64 for SQL, string for MongoDB).
- P is the store-native payload type (e.g. json.RawMessage, bson.Raw).
- T is the user's domain payload type.
The codec bridges T and P on every append and read. The Store represents the entity type (table/collection); the Stream's id identifies the particular instance within that type.
It is cheap to create — create one per operation and discard it. Stream is safe for concurrent use.
func NewStream ¶
func NewStream[I comparable, P any, T any]( store Store[I, P], id string, codec PayloadCodec[T, P], opts ...Option[P], ) Stream[I, P, T]
NewStream creates a lightweight stream handle. The stream does not need to exist in the store beforehand — it is created implicitly on first append. The id identifies the stream instance within the store's type.
codec is required and must not be nil. For SQL backends use JSONCodec; for MongoDB use the BSONCodec provided by the mongodb package.
Panics if store is nil.
Example ¶
package main
import (
"context"
"encoding/json"
"fmt"
"github.com/rbaliyan/ledger"
)
// mockStore is a minimal store for examples.
type mockStore struct{}
func (mockStore) Append(_ context.Context, _ string, entries ...ledger.RawEntry[json.RawMessage]) ([]int64, error) {
ids := make([]int64, len(entries))
for i := range entries {
ids[i] = int64(i + 1)
}
return ids, nil
}
func (mockStore) Read(_ context.Context, _ string, _ ...ledger.ReadOption) ([]ledger.StoredEntry[int64, json.RawMessage], error) {
return nil, nil
}
func (mockStore) Count(_ context.Context, _ string) (int64, error) { return 0, nil }
func (mockStore) SetTags(_ context.Context, _ string, _ int64, _ []string) error { return nil }
func (mockStore) SetAnnotations(_ context.Context, _ string, _ int64, _ map[string]*string) error {
return nil
}
func (mockStore) Trim(_ context.Context, _ string, _ int64) (int64, error) { return 0, nil }
func (mockStore) ListStreamIDs(_ context.Context, _ ...ledger.ListOption) ([]string, error) {
return nil, nil
}
func (mockStore) Close(_ context.Context) error { return nil }
func main() {
store := mockStore{}
type Order struct {
ID string `json:"id"`
Amount float64 `json:"amount"`
}
// The store represents the "orders" type (one table/collection).
// The stream is one instance within that type, identified by a stream ID.
s := ledger.NewStream(store, "user-123", ledger.JSONCodec[Order]{})
ids, err := s.Append(context.Background(), ledger.AppendInput[Order]{
Payload: Order{ID: "o-1", Amount: 99.99},
OrderKey: "customer-123",
DedupKey: "evt-abc",
})
if err != nil {
panic(err)
}
fmt.Println("appended:", len(ids), "entries")
}
Output: appended: 1 entries
Example (SchemaVersioning) ¶
package main
import (
"context"
"encoding/json"
"fmt"
"github.com/rbaliyan/ledger"
)
// mockStore is a minimal store for examples.
type mockStore struct{}
func (mockStore) Append(_ context.Context, _ string, entries ...ledger.RawEntry[json.RawMessage]) ([]int64, error) {
ids := make([]int64, len(entries))
for i := range entries {
ids[i] = int64(i + 1)
}
return ids, nil
}
func (mockStore) Read(_ context.Context, _ string, _ ...ledger.ReadOption) ([]ledger.StoredEntry[int64, json.RawMessage], error) {
return nil, nil
}
func (mockStore) Count(_ context.Context, _ string) (int64, error) { return 0, nil }
func (mockStore) SetTags(_ context.Context, _ string, _ int64, _ []string) error { return nil }
func (mockStore) SetAnnotations(_ context.Context, _ string, _ int64, _ map[string]*string) error {
return nil
}
func (mockStore) Trim(_ context.Context, _ string, _ int64) (int64, error) { return 0, nil }
func (mockStore) ListStreamIDs(_ context.Context, _ ...ledger.ListOption) ([]string, error) {
return nil, nil
}
func (mockStore) Close(_ context.Context) error { return nil }
func main() {
store := mockStore{}
type OrderV2 struct {
Name string `json:"name"`
Email string `json:"email"`
Amount float64 `json:"amount"`
}
// Create a v2 stream with upcaster from v1
s := ledger.NewStream(store, "user-123", ledger.JSONCodec[OrderV2]{},
ledger.WithSchemaVersion[json.RawMessage](2),
ledger.WithUpcaster(ledger.NewFieldMapper(1, 2).
RenameField("customer_name", "name").
AddDefault("email", "unknown@example.com")),
)
fmt.Println("stream:", s.ID(), "schema version:", s.SchemaVersion())
}
Output: stream: user-123 schema version: 2
func (Stream[I, P, T]) Append ¶
func (s Stream[I, P, T]) Append(ctx context.Context, entries ...AppendInput[T]) ([]I, error)
Append encodes and appends entries to the stream. Returns IDs of newly appended entries. Each entry is stamped with the stream's current schema version. Entries with duplicate dedup keys are silently skipped.
func (Stream[I, P, T]) ID ¶ added in v0.0.2
ID returns the stream instance ID within the store's type.
func (Stream[I, P, T]) Read ¶
Read returns decoded entries from the stream. Entries written with an older schema version are automatically upcasted to the current version before decoding into T.
func (Stream[I, P, T]) SchemaVersion ¶
SchemaVersion returns the current schema version used for new entries.
type Upcaster ¶
type Upcaster[P any] interface { // FromVersion returns the source version this upcaster handles. FromVersion() int // ToVersion returns the target version this upcaster produces. ToVersion() int // Upcast transforms the payload from source to target version. Upcast(ctx context.Context, payload P) (P, error) }
Upcaster transforms payload data from one schema version to the next. P is the store-native payload type (e.g. json.RawMessage for SQL backends, bson.Raw for MongoDB).
Register upcasters in sequence (v1→v2, v2→v3, etc.) to allow chained upgrades. When reading entries written with an older schema version, the stream applies upcasters in order to transform the payload before decoding into T.
Directories
¶
| Path | Synopsis |
|---|---|
|
api
|
|
|
Package ledgerpb exposes the ledger.Store API as a gRPC service with pluggable authentication and authorisation via SecurityGuard.
|
Package ledgerpb exposes the ledger.Store API as a gRPC service with pluggable authentication and authorisation via SecurityGuard. |
|
Package mongodb provides a MongoDB-backed ledger store.
|
Package mongodb provides a MongoDB-backed ledger store. |
|
Package postgres provides a PostgreSQL-backed ledger store.
|
Package postgres provides a PostgreSQL-backed ledger store. |
|
Package sqlite provides a SQLite-backed ledger store.
|
Package sqlite provides a SQLite-backed ledger store. |
|
Package storetest provides a backend-agnostic conformance test suite for ledger.Store implementations.
|
Package storetest provides a backend-agnostic conformance test suite for ledger.Store implementations. |