Documentation
¶
Overview ¶
Package persistence provides event sourcing and snapshotting for stateful actors.
The persistence package allows actors to persist their state through events and snapshots, enabling recovery after restarts or crashes. It supports pluggable storage backends through the Provider interface and configurable snapshot strategies.
Key types:
- Provider: Interface for persistence storage backends
- ProviderState: Combined interface for event and snapshot storage operations
- Mixin: A composable type that adds persistence capabilities to actors
- SnapshotStrategy: Interface for controlling when snapshots are taken
- EventStore: Interface for persisting and retrieving events
- SnapshotStore: Interface for persisting and retrieving snapshots
- InMemoryProvider: Built-in in-memory provider for testing
Basic usage:
type MyActor struct {
persistence.Mixin
state int
}
func (a *MyActor) Receive(ctx actor.Context) {
switch msg := ctx.Message().(type) {
case *AddValue:
a.PersistReceive(&ValueAdded{Amount: msg.Value})
a.state += msg.Value
case *ValueAdded:
a.state += msg.Amount
}
}
provider := persistence.NewInMemoryProvider()
props := actor.PropsFromProducer(func() actor.Actor { return &MyActor{} }).
WithReceiverMiddleware(persistence.Using(provider))
Index ¶
- func Using(provider Provider) func(next actor.ReceiverFunc) actor.ReceiverFunc
- type EventStore
- type InMemoryProvider
- func (provider *InMemoryProvider) DeleteEvents(actorName string, inclusiveToIndex int)
- func (provider *InMemoryProvider) DeleteSnapshots(actorName string, inclusiveToIndex int)
- func (provider *InMemoryProvider) GetEvents(actorName string, eventIndexStart int, eventIndexEnd int, callback func(e any))
- func (provider *InMemoryProvider) GetSnapshot(actorName string) (snapshot any, eventIndex int, ok bool)
- func (provider *InMemoryProvider) GetSnapshotInterval() int
- func (provider *InMemoryProvider) PersistEvent(actorName string, eventIndex int, event proto.Message)
- func (provider *InMemoryProvider) PersistSnapshot(actorName string, eventIndex int, snapshot proto.Message)
- func (provider *InMemoryProvider) Restart()
- type IntervalStrategy
- type Mixin
- type OfferSnapshot
- type Provider
- type ProviderState
- type Replay
- type ReplayComplete
- type RequestSnapshot
- type SnapshotStore
- type SnapshotStrategy
- type TimeStrategy
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Using ¶
func Using(provider Provider) func(next actor.ReceiverFunc) actor.ReceiverFunc
Types ¶
type EventStore ¶
type InMemoryProvider ¶
type InMemoryProvider struct {
// contains filtered or unexported fields
}
func NewInMemoryProvider ¶
func NewInMemoryProvider(snapshotInterval int) *InMemoryProvider
func (*InMemoryProvider) DeleteEvents ¶
func (provider *InMemoryProvider) DeleteEvents(actorName string, inclusiveToIndex int)
func (*InMemoryProvider) DeleteSnapshots ¶
func (provider *InMemoryProvider) DeleteSnapshots(actorName string, inclusiveToIndex int)
func (*InMemoryProvider) GetEvents ¶
func (provider *InMemoryProvider) GetEvents(actorName string, eventIndexStart int, eventIndexEnd int, callback func(e any))
func (*InMemoryProvider) GetSnapshot ¶
func (provider *InMemoryProvider) GetSnapshot(actorName string) (snapshot any, eventIndex int, ok bool)
func (*InMemoryProvider) GetSnapshotInterval ¶
func (provider *InMemoryProvider) GetSnapshotInterval() int
func (*InMemoryProvider) PersistEvent ¶
func (provider *InMemoryProvider) PersistEvent(actorName string, eventIndex int, event proto.Message)
func (*InMemoryProvider) PersistSnapshot ¶
func (provider *InMemoryProvider) PersistSnapshot(actorName string, eventIndex int, snapshot proto.Message)
func (*InMemoryProvider) Restart ¶
func (provider *InMemoryProvider) Restart()
type IntervalStrategy ¶
type IntervalStrategy struct {
// contains filtered or unexported fields
}
IntervalStrategy triggers a snapshot every N events. It matches the legacy behavior: eventIndex % interval == 0.
func NewIntervalStrategy ¶
func NewIntervalStrategy(interval int) *IntervalStrategy
NewIntervalStrategy creates an IntervalStrategy that triggers a snapshot every `interval` events. If interval <= 0 the strategy never triggers.
func (*IntervalStrategy) ShouldSnapshot ¶
func (s *IntervalStrategy) ShouldSnapshot(_ any, eventIndex int) bool
ShouldSnapshot returns true when eventIndex is a multiple of the configured interval (and interval > 0).
type Mixin ¶
type Mixin struct {
// contains filtered or unexported fields
}
func (*Mixin) PersistReceive ¶
func (*Mixin) PersistSnapshot ¶
func (*Mixin) Recovering ¶
func (*Mixin) SetSnapshotStrategy ¶
func (mixin *Mixin) SetSnapshotStrategy(strategy SnapshotStrategy)
SetSnapshotStrategy allows users to override the default interval-based snapshot strategy with a custom implementation.
type OfferSnapshot ¶
type OfferSnapshot struct {
Snapshot any
}
type Provider ¶
type Provider interface {
GetState() ProviderState
}
Provider is the abstraction used for persistence
type ProviderState ¶
type ProviderState interface {
SnapshotStore
EventStore
Restart()
GetSnapshotInterval() int
}
ProviderState is an object containing the implementation for the provider
type ReplayComplete ¶
type ReplayComplete struct{}
type RequestSnapshot ¶
type RequestSnapshot struct{}
type SnapshotStore ¶
type SnapshotStrategy ¶
type SnapshotStrategy interface {
// ShouldSnapshot returns true if a snapshot should be taken for the given
// event index. The state parameter is provided for strategies that may
// inspect actor state; it may be nil.
ShouldSnapshot(state any, eventIndex int) bool
}
SnapshotStrategy determines when a snapshot should be taken.
type TimeStrategy ¶
type TimeStrategy struct {
// contains filtered or unexported fields
}
TimeStrategy triggers a snapshot after a specified duration has elapsed since the last snapshot (or since the strategy was created/reset).
func NewTimeStrategy ¶
func NewTimeStrategy(interval time.Duration) *TimeStrategy
NewTimeStrategy creates a TimeStrategy that triggers a snapshot when at least `interval` has elapsed since the last snapshot.
func (*TimeStrategy) ShouldSnapshot ¶
func (s *TimeStrategy) ShouldSnapshot(_ any, _ int) bool
ShouldSnapshot returns true when the configured duration has elapsed since the last snapshot was taken (or since creation). When it returns true it resets the internal timer.