bridge

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 8, 2026 License: MIT Imports: 15 Imported by: 0

Documentation

Overview

Package bridge provides stream bridging between ledger stores. A Bridge polls a source store's mutation log and applies changes to a sink store.

Mutation logging must be enabled on the source store via WithMutationLog (SQLite, PostgreSQL, and MongoDB backends). The sink store may be any backend.

Eventual consistency: tag and annotation updates may lag behind the source by up to one polling interval. This is documented behaviour.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AppendEntry

type AppendEntry struct {
	ID            string            `json:"id"`
	Payload       json.RawMessage   `json:"payload"`
	OrderKey      string            `json:"order_key,omitempty"`
	DedupKey      string            `json:"dedup_key,omitempty"`
	SchemaVersion int               `json:"schema_version"`
	Metadata      map[string]string `json:"metadata,omitempty"`
	Tags          []string          `json:"tags,omitempty"`
}

AppendEntry is one entry within an append mutation event.

type Bridge

type Bridge[SI comparable, DI comparable] struct {
	// contains filtered or unexported fields
}

Bridge polls a source mutation log and applies changes to a sink store. SI is the source store ID type; DI is the sink store ID type.

Eventual consistency: tag and annotation updates lag behind the source by up to one polling interval (default 5s).

Multiple Bridge instances may run against the same source and sink safely — all mutations are idempotent at the sink (unique source_id index). Cursor writes are monotonic: an instance that lags behind will never regress a faster instance's cursor position.

Wrap sink streams in ReadOnlyStream to prevent accidental direct writes.

func New

func New[SI comparable, DI comparable](
	mutations ledger.Store[SI, json.RawMessage],
	sink ledger.Store[DI, json.RawMessage],
	codec IDCodec[SI],
	opts ...Option,
) (*Bridge[SI, DI], error)

New creates a Bridge. mutations is the source mutation log store (same DB as source). sink is the destination store. codec encodes the mutation log's ID type for cursor storage.

If sink implements ledger.CursorStore, progress is persisted in the sink DB so the Bridge resumes from where it left off after a restart. If sink implements FindBySourceID (sinkLookup), SetTags, SetAnnotations, and Trim mutations are applied; otherwise they are skipped with a warning.

Returns an error only if OTel metrics initialisation fails (when WithMetricsEnabled(true) is passed). All other configuration errors are surfaced at Start/Poll time.

func (*Bridge[SI, DI]) Poll

func (b *Bridge[SI, DI]) Poll(ctx context.Context) error

Poll runs a single bridging cycle and returns any error. Safe to call concurrently with Start/Stop — it does not affect the background goroutine.

func (*Bridge[SI, DI]) Start

func (b *Bridge[SI, DI]) Start(ctx context.Context)

Start begins bridging in a background goroutine. Safe to call once; subsequent calls are no-ops. The provided context controls goroutine lifetime; cancelling it is equivalent to calling Stop.

func (*Bridge[SI, DI]) Stats

func (b *Bridge[SI, DI]) Stats() Stats

Stats returns a snapshot of the bridging counters.

func (*Bridge[SI, DI]) Stop

func (b *Bridge[SI, DI]) Stop()

Stop signals the Bridge to stop and waits for it to exit. Safe to call multiple times. If Start was never called, Stop returns immediately.

type IDCodec

type IDCodec[I comparable] interface {
	Encode(I) string
	Decode(string) (I, error)
	Zero() I
	Less(a, b I) bool
}

IDCodec serialises and deserialises store IDs to/from string for cursor storage. Less must return true if a should be considered an earlier position than b.

type Int64Codec

type Int64Codec struct{}

Int64Codec implements IDCodec[int64] for SQLite and PostgreSQL stores. Cursors are zero-padded to 19 digits so that lexicographic order in the cursor column matches numeric order (avoids "9" > "10" edge cases).

func (Int64Codec) Decode

func (Int64Codec) Decode(s string) (int64, error)

func (Int64Codec) Encode

func (Int64Codec) Encode(id int64) string

func (Int64Codec) Less

func (Int64Codec) Less(a, b int64) bool

func (Int64Codec) Zero

func (Int64Codec) Zero() int64

type MutationEvent

type MutationEvent struct {
	Type        MutationType       `json:"type"`
	Stream      string             `json:"stream"`
	Entries     []AppendEntry      `json:"entries,omitempty"`
	EntryID     string             `json:"entry_id,omitempty"`
	Tags        []string           `json:"tags,omitempty"`
	Annotations map[string]*string `json:"annotations,omitempty"`
	BeforeID    string             `json:"before_id,omitempty"`
}

MutationEvent is a single change decoded from the mutation log.

type MutationType

type MutationType string

MutationType identifies the kind of change recorded in a mutation log entry.

const (
	MutationAppend         MutationType = "append"
	MutationSetTags        MutationType = "set_tags"
	MutationSetAnnotations MutationType = "set_annotations"
	MutationTrim           MutationType = "trim"
)

type Option

type Option func(*options)

Option configures a Bridge.

func WithBatchSize

func WithBatchSize(n int) Option

WithBatchSize sets the number of mutations processed per poll. Defaults to 500.

func WithInterval

func WithInterval(d time.Duration) Option

WithInterval sets the polling interval. Defaults to 5s.

func WithLogger

func WithLogger(l *slog.Logger) Option

WithLogger sets the logger. Defaults to slog.Default().

func WithMeter added in v0.0.7

func WithMeter(m metric.Meter) Option

WithMeter sets a custom meter instead of using the global OTel provider.

func WithMeterName added in v0.0.7

func WithMeterName(name string) Option

WithMeterName sets the meter name. Defaults to "github.com/rbaliyan/ledger/bridge".

func WithMetricsEnabled added in v0.0.7

func WithMetricsEnabled(enabled bool) Option

WithMetricsEnabled enables or disables OpenTelemetry metrics (disabled by default). When enabled the following instruments are registered:

  • ledger.bridge.polls (counter) — total poll cycles, attrs: bridge.name, status
  • ledger.bridge.poll.duration (histogram, s) — per-cycle latency
  • ledger.bridge.mutations.applied (counter) — mutations written to sink, attrs: mutation.type
  • ledger.bridge.mutations.skipped (counter) — mutations dropped by config, attrs: mutation.type
  • ledger.bridge.mutations.errors (counter) — mutations that failed to apply, attrs: mutation.type
  • ledger.bridge.lag (histogram, s) — replication lag (age of oldest mutation per poll)

func WithName

func WithName(name string) Option

WithName sets the cursor name used to track progress. Use a unique name per source-sink pair. Defaults to "default".

func WithParallelStreams added in v0.0.10

func WithParallelStreams(n int) Option

WithParallelStreams enables concurrent mutation processing across independent streams. n goroutines process distinct streams simultaneously; events within the same stream are always applied in order. The default (n <= 1) is serial.

Use this when the sink is remote (e.g. a database or webhook) and latency per mutation dominates throughput. Safe to set to runtime.NumCPU().

func WithSkipMutationTypes

func WithSkipMutationTypes(types ...MutationType) Option

WithSkipMutationTypes instructs the Bridge to silently drop mutation events of the given types instead of applying them to the sink. Use this when the sink backend does not support the operation — e.g. pass MutationSetTags and MutationSetAnnotations when bridging to ClickHouse.

func WithTracer added in v0.0.7

func WithTracer(t trace.Tracer) Option

WithTracer sets a custom tracer instead of using the global OTel provider.

func WithTracerName added in v0.0.7

func WithTracerName(name string) Option

WithTracerName sets the tracer name. Defaults to "github.com/rbaliyan/ledger/bridge".

func WithTracesEnabled added in v0.0.7

func WithTracesEnabled(enabled bool) Option

WithTracesEnabled enables or disables per-poll OpenTelemetry spans (disabled by default).

type ReadOnlyStream

type ReadOnlyStream[I comparable, P any, T any] struct {
	// contains filtered or unexported fields
}

ReadOnlyStream wraps a ledger.Stream and returns ledger.ErrReadOnly for all write operations. Use this for sink streams to prevent accidental writes.

func NewReadOnlyStream

func NewReadOnlyStream[I comparable, P any, T any](s ledger.Stream[I, P, T]) ReadOnlyStream[I, P, T]

NewReadOnlyStream wraps s so that Append, SetTags, and SetAnnotations return ledger.ErrReadOnly.

func (ReadOnlyStream[I, P, T]) Append

func (s ReadOnlyStream[I, P, T]) Append(_ context.Context, _ ...ledger.AppendInput[T]) ([]I, error)

Append always returns ledger.ErrReadOnly.

func (ReadOnlyStream[I, P, T]) ID

func (s ReadOnlyStream[I, P, T]) ID() string

ID returns the stream instance ID.

func (ReadOnlyStream[I, P, T]) Read

func (s ReadOnlyStream[I, P, T]) Read(ctx context.Context, opts ...ledger.ReadOption) ([]ledger.Entry[I, T], error)

Read returns decoded entries. Upcasting is applied as normal.

func (ReadOnlyStream[I, P, T]) SchemaVersion

func (s ReadOnlyStream[I, P, T]) SchemaVersion() int

SchemaVersion returns the schema version used for new entries.

func (ReadOnlyStream[I, P, T]) SetAnnotations

func (s ReadOnlyStream[I, P, T]) SetAnnotations(_ context.Context, _ I, _ map[string]*string) error

SetAnnotations always returns ledger.ErrReadOnly.

func (ReadOnlyStream[I, P, T]) SetTags

func (s ReadOnlyStream[I, P, T]) SetTags(_ context.Context, _ I, _ []string) error

SetTags always returns ledger.ErrReadOnly.

type Stats

type Stats struct {
	PollCount  int64
	ApplyCount int64
	SkipCount  int64
	ErrorCount int64
}

Stats holds counters accumulated since the Bridge started.

type StringCodec

type StringCodec struct{}

StringCodec implements IDCodec[string] for MongoDB stores. Ordering is lexicographic, which is correct for MongoDB ObjectID hex strings (first 4 bytes are a big-endian timestamp).

func (StringCodec) Decode

func (StringCodec) Decode(s string) (string, error)

func (StringCodec) Encode

func (StringCodec) Encode(id string) string

func (StringCodec) Less

func (StringCodec) Less(a, b string) bool

func (StringCodec) Zero

func (StringCodec) Zero() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL