Documentation
¶
Overview ¶
Package projection provides tools for managing and rebuilding projections.
Per-Projection Databases ¶
Dir manages per-projection SQLite databases in a directory. Each projection gets its own .db file, enabling parallel rebuilds (no shared write lock), independent failure isolation, and atomic rebuild by deleting a single file.
dir, _ := projection.NewDir("/data/projections")
db, _ := dir.DB("orders")
checkpoint, _ := projection.NewCheckpoint(db)
view := sqlview.From[any](db, ordersConfig)
SQLiteCheckpoint stores checkpoints in the same database as the projection data. Deleting the .db file resets both the projection and its checkpoint.
Rebuild ¶
Rebuild replays all events from a store through projection handlers, resetting checkpoints and processing events in batches for efficiency. Progress reporting and concurrent rebuilds of independent projections are supported.
Index ¶
Constants ¶
const ( // DefaultBatchSize is the default number of events per batch during rebuild. DefaultBatchSize = 500 // MaxBatchSize prevents absurd batch sizes. MaxBatchSize = 50_000 // MaxConcurrentRebuilds limits parallel rebuilds. MaxConcurrentRebuilds = 32 )
Bounds — TigerStyle.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type Dir ¶
type Dir struct {
// contains filtered or unexported fields
}
Dir manages per-projection SQLite databases in a directory. Each projection gets its own .db file: {dir}/{name}.db
This enables parallel rebuilds (no shared write lock), independent failure isolation, and atomic rebuild by deleting a single file.
func NewDir ¶
NewDir creates a projection directory manager. Creates the directory if it doesn't exist.
func (*Dir) DB ¶
DB returns (or creates) the SQLite database for a projection. The DB is opened with WAL mode and busy_timeout for concurrent reads. File: {dir}/{name}.db
type Handler ¶
type Handler[E any] func(ctx context.Context, events []subscription.GlobalEvent[E]) error
Handler processes a batch of events during rebuild.
type Progress ¶
type Progress struct {
// ProjectionName identifies which projection is being rebuilt.
ProjectionName string
// Processed is the number of events processed so far.
Processed uint64
// Total is the total number of events to process (approximate).
Total uint64
// Done is true when the rebuild is complete.
Done bool
// Err is set if the rebuild failed.
Err error
}
Progress reports rebuild progress to the caller.
type ProgressFunc ¶
type ProgressFunc func(Progress)
ProgressFunc receives progress updates during rebuild.
type RebuildConfig ¶
type RebuildConfig[E any] struct { // Name identifies this projection. Required. Name string // Reader provides the global event stream. Required. Reader subscription.GlobalReader[E] // Checkpoint to reset and update. Required. Checkpoint subscription.Checkpoint // ConsumerID is the checkpoint consumer ID to reset. Required. ConsumerID string // Handler processes batches of events. Required. Handler Handler[E] // BatchSize controls how many events are read per batch. Default: 500. BatchSize int // OnProgress receives progress updates. Optional. OnProgress ProgressFunc }
RebuildConfig configures a single projection rebuild.
type RebuildResult ¶
RebuildResult holds the outcome of a single projection rebuild.
func RebuildConcurrent ¶
func RebuildConcurrent[E any](ctx context.Context, configs []RebuildConfig[E]) []RebuildResult
RebuildConcurrent rebuilds multiple independent projections in parallel. Each projection is rebuilt from scratch. Returns results for all projections. The function returns after all rebuilds complete (or fail).
type SQLiteCheckpoint ¶
type SQLiteCheckpoint struct {
// contains filtered or unexported fields
}
SQLiteCheckpoint implements subscription.Checkpoint for a single projection DB. The checkpoint table is colocated with the projection data, so deleting the .db file resets both the projection and its checkpoint atomically.
func NewCheckpoint ¶
func NewCheckpoint(db *sql.DB) (*SQLiteCheckpoint, error)
NewCheckpoint creates a checkpoint backed by the given SQLite database. Creates the _checkpoint table if it doesn't exist.