Documentation
¶
Overview ¶
Package bridge provides stream bridging between ledger stores. A Bridge polls a source store's mutation log and applies changes to a sink store.
Mutation logging must be enabled on the source store via WithMutationLog (SQLite, PostgreSQL, and MongoDB backends). The sink store may be any backend.
Eventual consistency: tag and annotation updates may lag behind the source by up to one polling interval. This is documented behaviour.
Index ¶
- type AppendEntry
- type Bridge
- type IDCodec
- type Int64Codec
- type MutationEvent
- type MutationType
- type Option
- func WithBatchSize(n int) Option
- func WithInterval(d time.Duration) Option
- func WithLogger(l *slog.Logger) Option
- func WithMeter(m metric.Meter) Option
- func WithMeterName(name string) Option
- func WithMetricsEnabled(enabled bool) Option
- func WithName(name string) Option
- func WithParallelStreams(n int) Option
- func WithSkipMutationTypes(types ...MutationType) Option
- func WithTracer(t trace.Tracer) Option
- func WithTracerName(name string) Option
- func WithTracesEnabled(enabled bool) Option
- type ReadOnlyStream
- func (s ReadOnlyStream[I, P, T]) Append(_ context.Context, _ ...ledger.AppendInput[T]) ([]I, error)
- func (s ReadOnlyStream[I, P, T]) ID() string
- func (s ReadOnlyStream[I, P, T]) Read(ctx context.Context, opts ...ledger.ReadOption) ([]ledger.Entry[I, T], error)
- func (s ReadOnlyStream[I, P, T]) SchemaVersion() int
- func (s ReadOnlyStream[I, P, T]) SetAnnotations(_ context.Context, _ I, _ map[string]*string) error
- func (s ReadOnlyStream[I, P, T]) SetTags(_ context.Context, _ I, _ []string) error
- type Stats
- type StringCodec
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AppendEntry ¶
type AppendEntry struct {
ID string `json:"id"`
Payload json.RawMessage `json:"payload"`
OrderKey string `json:"order_key,omitempty"`
DedupKey string `json:"dedup_key,omitempty"`
SchemaVersion int `json:"schema_version"`
Metadata map[string]string `json:"metadata,omitempty"`
Tags []string `json:"tags,omitempty"`
}
AppendEntry is one entry within an append mutation event.
type Bridge ¶
type Bridge[SI comparable, DI comparable] struct { // contains filtered or unexported fields }
Bridge polls a source mutation log and applies changes to a sink store. SI is the source store ID type; DI is the sink store ID type.
Eventual consistency: tag and annotation updates lag behind the source by up to one polling interval (default 5s).
Multiple Bridge instances may run against the same source and sink safely — all mutations are idempotent at the sink (unique source_id index). Cursor writes are monotonic: an instance that lags behind will never regress a faster instance's cursor position.
Wrap sink streams in ReadOnlyStream to prevent accidental direct writes.
func New ¶
func New[SI comparable, DI comparable]( mutations ledger.Store[SI, json.RawMessage], sink ledger.Store[DI, json.RawMessage], codec IDCodec[SI], opts ...Option, ) (*Bridge[SI, DI], error)
New creates a Bridge. mutations is the source mutation log store (same DB as source). sink is the destination store. codec encodes the mutation log's ID type for cursor storage.
If sink implements ledger.CursorStore, progress is persisted in the sink DB so the Bridge resumes from where it left off after a restart. If sink implements FindBySourceID (sinkLookup), SetTags, SetAnnotations, and Trim mutations are applied; otherwise they are skipped with a warning.
Returns an error only if OTel metrics initialisation fails (when WithMetricsEnabled(true) is passed). All other configuration errors are surfaced at Start/Poll time.
func (*Bridge[SI, DI]) Poll ¶
Poll runs a single bridging cycle and returns any error. Safe to call concurrently with Start/Stop — it does not affect the background goroutine.
func (*Bridge[SI, DI]) Start ¶
Start begins bridging in a background goroutine. Safe to call once; subsequent calls are no-ops. The provided context controls goroutine lifetime; cancelling it is equivalent to calling Stop.
type IDCodec ¶
type IDCodec[I comparable] interface { Encode(I) string Decode(string) (I, error) Zero() I Less(a, b I) bool }
IDCodec serialises and deserialises store IDs to/from string for cursor storage. Less must return true if a should be considered an earlier position than b.
type Int64Codec ¶
type Int64Codec struct{}
Int64Codec implements IDCodec[int64] for SQLite and PostgreSQL stores. Cursors are zero-padded to 19 digits so that lexicographic order in the cursor column matches numeric order (avoids "9" > "10" edge cases).
func (Int64Codec) Encode ¶
func (Int64Codec) Encode(id int64) string
func (Int64Codec) Less ¶
func (Int64Codec) Less(a, b int64) bool
func (Int64Codec) Zero ¶
func (Int64Codec) Zero() int64
type MutationEvent ¶
type MutationEvent struct {
Type MutationType `json:"type"`
Stream string `json:"stream"`
Entries []AppendEntry `json:"entries,omitempty"`
EntryID string `json:"entry_id,omitempty"`
Tags []string `json:"tags,omitempty"`
Annotations map[string]*string `json:"annotations,omitempty"`
BeforeID string `json:"before_id,omitempty"`
}
MutationEvent is a single change decoded from the mutation log.
type MutationType ¶
type MutationType string
MutationType identifies the kind of change recorded in a mutation log entry.
const ( MutationAppend MutationType = "append" MutationSetTags MutationType = "set_tags" MutationSetAnnotations MutationType = "set_annotations" MutationTrim MutationType = "trim" )
type Option ¶
type Option func(*options)
Option configures a Bridge.
func WithBatchSize ¶
WithBatchSize sets the number of mutations processed per poll. Defaults to 500.
func WithInterval ¶
WithInterval sets the polling interval. Defaults to 5s.
func WithLogger ¶
WithLogger sets the logger. Defaults to slog.Default().
func WithMeter ¶ added in v0.0.7
WithMeter sets a custom meter instead of using the global OTel provider.
func WithMeterName ¶ added in v0.0.7
WithMeterName sets the meter name. Defaults to "github.com/rbaliyan/ledger/bridge".
func WithMetricsEnabled ¶ added in v0.0.7
WithMetricsEnabled enables or disables OpenTelemetry metrics (disabled by default). When enabled the following instruments are registered:
- ledger.bridge.polls (counter) — total poll cycles, attrs: bridge.name, status
- ledger.bridge.poll.duration (histogram, s) — per-cycle latency
- ledger.bridge.mutations.applied (counter) — mutations written to sink, attrs: mutation.type
- ledger.bridge.mutations.skipped (counter) — mutations dropped by config, attrs: mutation.type
- ledger.bridge.mutations.errors (counter) — mutations that failed to apply, attrs: mutation.type
- ledger.bridge.lag (histogram, s) — replication lag (age of oldest mutation per poll)
func WithName ¶
WithName sets the cursor name used to track progress. Use a unique name per source-sink pair. Defaults to "default".
func WithParallelStreams ¶ added in v0.0.10
WithParallelStreams enables concurrent mutation processing across independent streams. n goroutines process distinct streams simultaneously; events within the same stream are always applied in order. The default (n <= 1) is serial.
Use this when the sink is remote (e.g. a database or webhook) and latency per mutation dominates throughput. Safe to set to runtime.NumCPU().
func WithSkipMutationTypes ¶
func WithSkipMutationTypes(types ...MutationType) Option
WithSkipMutationTypes instructs the Bridge to silently drop mutation events of the given types instead of applying them to the sink. Use this when the sink backend does not support the operation — e.g. pass MutationSetTags and MutationSetAnnotations when bridging to ClickHouse.
func WithTracer ¶ added in v0.0.7
WithTracer sets a custom tracer instead of using the global OTel provider.
func WithTracerName ¶ added in v0.0.7
WithTracerName sets the tracer name. Defaults to "github.com/rbaliyan/ledger/bridge".
func WithTracesEnabled ¶ added in v0.0.7
WithTracesEnabled enables or disables per-poll OpenTelemetry spans (disabled by default).
type ReadOnlyStream ¶
type ReadOnlyStream[I comparable, P any, T any] struct { // contains filtered or unexported fields }
ReadOnlyStream wraps a ledger.Stream and returns ledger.ErrReadOnly for all write operations. Use this for sink streams to prevent accidental writes.
func NewReadOnlyStream ¶
func NewReadOnlyStream[I comparable, P any, T any](s ledger.Stream[I, P, T]) ReadOnlyStream[I, P, T]
NewReadOnlyStream wraps s so that Append, SetTags, and SetAnnotations return ledger.ErrReadOnly.
func (ReadOnlyStream[I, P, T]) Append ¶
func (s ReadOnlyStream[I, P, T]) Append(_ context.Context, _ ...ledger.AppendInput[T]) ([]I, error)
Append always returns ledger.ErrReadOnly.
func (ReadOnlyStream[I, P, T]) ID ¶
func (s ReadOnlyStream[I, P, T]) ID() string
ID returns the stream instance ID.
func (ReadOnlyStream[I, P, T]) Read ¶
func (s ReadOnlyStream[I, P, T]) Read(ctx context.Context, opts ...ledger.ReadOption) ([]ledger.Entry[I, T], error)
Read returns decoded entries. Upcasting is applied as normal.
func (ReadOnlyStream[I, P, T]) SchemaVersion ¶
func (s ReadOnlyStream[I, P, T]) SchemaVersion() int
SchemaVersion returns the schema version used for new entries.
func (ReadOnlyStream[I, P, T]) SetAnnotations ¶
SetAnnotations always returns ledger.ErrReadOnly.
func (ReadOnlyStream[I, P, T]) SetTags ¶
func (s ReadOnlyStream[I, P, T]) SetTags(_ context.Context, _ I, _ []string) error
SetTags always returns ledger.ErrReadOnly.
type StringCodec ¶
type StringCodec struct{}
StringCodec implements IDCodec[string] for MongoDB stores. Ordering is lexicographic, which is correct for MongoDB ObjectID hex strings (first 4 bytes are a big-endian timestamp).
func (StringCodec) Encode ¶
func (StringCodec) Encode(id string) string
func (StringCodec) Less ¶
func (StringCodec) Less(a, b string) bool
func (StringCodec) Zero ¶
func (StringCodec) Zero() string