Documentation
¶
Overview ¶
internal/spock/desired.go
internal/spock/disabled_subscription.go
internal/spock/lag_tracker_commit_ts.go
internal/spock/names.go
internal/spock/node.go
internal/spock/reconciler.go
internal/spock/refresh.go
internal/spock/replication_slot.go
internal/spock/replication_slot_advance.go
internal/spock/replication_slot_create.go
internal/spock/repset.go
internal/spock/reset.go
internal/spock/subscription.go
internal/spock/sync_event.go
internal/spock/user.go
internal/spock/wait_for_sync_event.go
Index ¶
- Constants
- func ComputeDesired(cfg *config.Config, conns map[string]*pgxpool.Pool) map[resource.Identifier]resource.Resource
- func RefreshActual(ctx context.Context, cfg *config.Config, conns map[string]*pgxpool.Pool, ...) (map[resource.Identifier]resource.Resource, error)
- func ResetBootstrappedNodes(ctx context.Context, cfg *config.Config, conns map[string]*pgxpool.Pool) error
- func ResetSpock(ctx context.Context, cfg *config.Config, conns map[string]*pgxpool.Pool) error
- type DisabledSubscription
- func (s *DisabledSubscription) Create(ctx context.Context) error
- func (s *DisabledSubscription) Delete(_ context.Context) error
- func (s *DisabledSubscription) Dependencies() []resource.Identifier
- func (s *DisabledSubscription) Identifier() resource.Identifier
- func (s *DisabledSubscription) Refresh(_ context.Context) error
- func (s *DisabledSubscription) Status() resource.Status
- func (s *DisabledSubscription) Update(_ context.Context) error
- type LagTrackerCommitTimestamp
- func (r *LagTrackerCommitTimestamp) Create(ctx context.Context) error
- func (r *LagTrackerCommitTimestamp) Delete(_ context.Context) error
- func (r *LagTrackerCommitTimestamp) Dependencies() []resource.Identifier
- func (r *LagTrackerCommitTimestamp) Identifier() resource.Identifier
- func (r *LagTrackerCommitTimestamp) Refresh(_ context.Context) error
- func (r *LagTrackerCommitTimestamp) Status() resource.Status
- func (r *LagTrackerCommitTimestamp) Update(_ context.Context) error
- type PgEdgeUser
- func (u *PgEdgeUser) Create(ctx context.Context) error
- func (u *PgEdgeUser) Delete(ctx context.Context) error
- func (u *PgEdgeUser) Dependencies() []resource.Identifier
- func (u *PgEdgeUser) Identifier() resource.Identifier
- func (u *PgEdgeUser) Refresh(ctx context.Context) error
- func (u *PgEdgeUser) Status() resource.Status
- func (u *PgEdgeUser) Update(_ context.Context) error
- type ReplicationSlot
- func (r *ReplicationSlot) Create(ctx context.Context) error
- func (r *ReplicationSlot) Delete(ctx context.Context) error
- func (r *ReplicationSlot) Dependencies() []resource.Identifier
- func (r *ReplicationSlot) Identifier() resource.Identifier
- func (r *ReplicationSlot) Refresh(ctx context.Context) error
- func (r *ReplicationSlot) Status() resource.Status
- func (r *ReplicationSlot) Update(_ context.Context) error
- type ReplicationSlotAdvanceFromCTS
- func (r *ReplicationSlotAdvanceFromCTS) Create(ctx context.Context) error
- func (r *ReplicationSlotAdvanceFromCTS) Delete(_ context.Context) error
- func (r *ReplicationSlotAdvanceFromCTS) Dependencies() []resource.Identifier
- func (r *ReplicationSlotAdvanceFromCTS) Identifier() resource.Identifier
- func (r *ReplicationSlotAdvanceFromCTS) Refresh(_ context.Context) error
- func (r *ReplicationSlotAdvanceFromCTS) Status() resource.Status
- func (r *ReplicationSlotAdvanceFromCTS) Update(_ context.Context) error
- type ReplicationSlotCreate
- func (r *ReplicationSlotCreate) Create(ctx context.Context) error
- func (r *ReplicationSlotCreate) Delete(_ context.Context) error
- func (r *ReplicationSlotCreate) Dependencies() []resource.Identifier
- func (r *ReplicationSlotCreate) Identifier() resource.Identifier
- func (r *ReplicationSlotCreate) Refresh(_ context.Context) error
- func (r *ReplicationSlotCreate) Status() resource.Status
- func (r *ReplicationSlotCreate) Update(_ context.Context) error
- type SpockNode
- func (n *SpockNode) Create(ctx context.Context) error
- func (n *SpockNode) Delete(ctx context.Context) error
- func (n *SpockNode) Dependencies() []resource.Identifier
- func (n *SpockNode) Identifier() resource.Identifier
- func (n *SpockNode) Refresh(ctx context.Context) error
- func (n *SpockNode) Status() resource.Status
- func (n *SpockNode) Update(_ context.Context) error
- type SpockReconciler
- type Subscription
- func (s *Subscription) Create(ctx context.Context) error
- func (s *Subscription) Delete(ctx context.Context) error
- func (s *Subscription) Dependencies() []resource.Identifier
- func (s *Subscription) Identifier() resource.Identifier
- func (s *Subscription) Refresh(ctx context.Context) error
- func (s *Subscription) Status() resource.Status
- func (s *Subscription) Update(ctx context.Context) error
- type SyncEvent
- func (r *SyncEvent) Create(ctx context.Context) error
- func (r *SyncEvent) Delete(_ context.Context) error
- func (r *SyncEvent) Dependencies() []resource.Identifier
- func (r *SyncEvent) Identifier() resource.Identifier
- func (r *SyncEvent) Refresh(_ context.Context) error
- func (r *SyncEvent) Status() resource.Status
- func (r *SyncEvent) Update(_ context.Context) error
- type WaitForSyncEvent
- func (r *WaitForSyncEvent) Create(ctx context.Context) error
- func (r *WaitForSyncEvent) Delete(_ context.Context) error
- func (r *WaitForSyncEvent) Dependencies() []resource.Identifier
- func (r *WaitForSyncEvent) Identifier() resource.Identifier
- func (r *WaitForSyncEvent) Refresh(_ context.Context) error
- func (r *WaitForSyncEvent) Status() resource.Status
- func (r *WaitForSyncEvent) Update(_ context.Context) error
Constants ¶
const ( ResourceTypeUser = "spock.user" ResourceTypeNode = "spock.node" ResourceTypeReplicationSlot = "spock.replicationslot" ResourceTypeSubscription = "spock.subscription" ResourceTypeReplicationSlotCreate = "spock.replication_slot_create" ResourceTypeSyncEvent = "spock.sync_event" ResourceTypeWaitForSyncEvent = "spock.wait_for_sync_event" ResourceTypeLagTrackerCommitTS = "spock.lag_tracker_commit_ts" ResourceTypeReplicationSlotAdvanceFromCTS = "spock.replication_slot_advance_from_cts" ResourceTypeDisabledSubscription = "spock.disabled_subscription" )
Resource type constants for Spock resources.
Variables ¶
This section is empty.
Functions ¶
func ComputeDesired ¶
func ComputeDesired(cfg *config.Config, conns map[string]*pgxpool.Pool) map[resource.Identifier]resource.Resource
ComputeDesired builds the full resource graph from node config.
func RefreshActual ¶
func RefreshActual( ctx context.Context, cfg *config.Config, conns map[string]*pgxpool.Pool, desired map[resource.Identifier]resource.Resource, ) (map[resource.Identifier]resource.Resource, error)
RefreshActual refreshes all resources in the desired set to populate their Status, discovers orphan nodes, subscriptions, and replication slots from PostgreSQL catalogs, then cross-references subscriptions with their provider-side slots. Returns the combined "actual" map of everything that exists.
Types ¶
type DisabledSubscription ¶
type DisabledSubscription struct {
// contains filtered or unexported fields
}
DisabledSubscription creates a disabled subscription from a peer provider to the new node during populate. This registers the subscription in Spock metadata early so the lag tracker can populate as data flows. The end-state Subscription resource later detects the disabled state and enables it via Update. Ephemeral resource — re-executes every run.
func NewDisabledSubscription ¶
func (*DisabledSubscription) Create ¶
func (s *DisabledSubscription) Create(ctx context.Context) error
func (*DisabledSubscription) Delete ¶
func (s *DisabledSubscription) Delete(_ context.Context) error
func (*DisabledSubscription) Dependencies ¶
func (s *DisabledSubscription) Dependencies() []resource.Identifier
func (*DisabledSubscription) Identifier ¶
func (s *DisabledSubscription) Identifier() resource.Identifier
func (*DisabledSubscription) Refresh ¶
func (s *DisabledSubscription) Refresh(_ context.Context) error
Refresh always returns not-exists — ephemeral resource re-executes every run.
func (*DisabledSubscription) Status ¶
func (s *DisabledSubscription) Status() resource.Status
type LagTrackerCommitTimestamp ¶
type LagTrackerCommitTimestamp struct {
CommitTS *time.Time // populated during Create
// contains filtered or unexported fields
}
LagTrackerCommitTimestamp reads the commit timestamp from spock.lag_tracker on the new node. Captures how far peer data has been replicated. Ephemeral resource — re-executes every run.
func NewLagTrackerCommitTimestamp ¶
func NewLagTrackerCommitTimestamp(originName, receiverName string, conn *pgxpool.Pool, extraDeps ...resource.Identifier) *LagTrackerCommitTimestamp
func (*LagTrackerCommitTimestamp) Create ¶
func (r *LagTrackerCommitTimestamp) Create(ctx context.Context) error
func (*LagTrackerCommitTimestamp) Delete ¶
func (r *LagTrackerCommitTimestamp) Delete(_ context.Context) error
func (*LagTrackerCommitTimestamp) Dependencies ¶
func (r *LagTrackerCommitTimestamp) Dependencies() []resource.Identifier
func (*LagTrackerCommitTimestamp) Identifier ¶
func (r *LagTrackerCommitTimestamp) Identifier() resource.Identifier
func (*LagTrackerCommitTimestamp) Refresh ¶
func (r *LagTrackerCommitTimestamp) Refresh(_ context.Context) error
func (*LagTrackerCommitTimestamp) Status ¶
func (r *LagTrackerCommitTimestamp) Status() resource.Status
type PgEdgeUser ¶
type PgEdgeUser struct {
// contains filtered or unexported fields
}
PgEdgeUser ensures the pgedge replication role exists on a node.
func NewPgEdgeUser ¶
func (*PgEdgeUser) Dependencies ¶
func (u *PgEdgeUser) Dependencies() []resource.Identifier
func (*PgEdgeUser) Identifier ¶
func (u *PgEdgeUser) Identifier() resource.Identifier
func (*PgEdgeUser) Status ¶
func (u *PgEdgeUser) Status() resource.Status
type ReplicationSlot ¶
type ReplicationSlot struct {
// contains filtered or unexported fields
}
ReplicationSlot manages a provider-side logical replication slot. Executes on the provider node's connection.
func NewReplicationSlot ¶
func NewReplicationSlot(providerName, subscriberName, dbName string, conn *pgxpool.Pool) *ReplicationSlot
func (*ReplicationSlot) Create ¶
func (r *ReplicationSlot) Create(ctx context.Context) error
Create is a no-op — Spock's sub_create creates the slot automatically.
func (*ReplicationSlot) Delete ¶
func (r *ReplicationSlot) Delete(ctx context.Context) error
Delete terminates any active walsender and drops the replication slot.
func (*ReplicationSlot) Dependencies ¶
func (r *ReplicationSlot) Dependencies() []resource.Identifier
func (*ReplicationSlot) Identifier ¶
func (r *ReplicationSlot) Identifier() resource.Identifier
func (*ReplicationSlot) Status ¶
func (r *ReplicationSlot) Status() resource.Status
type ReplicationSlotAdvanceFromCTS ¶
type ReplicationSlotAdvanceFromCTS struct {
// contains filtered or unexported fields
}
ReplicationSlotAdvanceFromCTS advances a peer's replication slot to skip WAL already synced via the source. Reads the commit timestamp from the paired LagTrackerCommitTimestamp via struct pointer. Ephemeral resource — re-executes every run.
func NewReplicationSlotAdvanceFromCTS ¶
func NewReplicationSlotAdvanceFromCTS(providerName, subscriberName, dbName string, lagTracker *LagTrackerCommitTimestamp, conn *pgxpool.Pool) *ReplicationSlotAdvanceFromCTS
func (*ReplicationSlotAdvanceFromCTS) Create ¶
func (r *ReplicationSlotAdvanceFromCTS) Create(ctx context.Context) error
func (*ReplicationSlotAdvanceFromCTS) Delete ¶
func (r *ReplicationSlotAdvanceFromCTS) Delete(_ context.Context) error
func (*ReplicationSlotAdvanceFromCTS) Dependencies ¶
func (r *ReplicationSlotAdvanceFromCTS) Dependencies() []resource.Identifier
func (*ReplicationSlotAdvanceFromCTS) Identifier ¶
func (r *ReplicationSlotAdvanceFromCTS) Identifier() resource.Identifier
func (*ReplicationSlotAdvanceFromCTS) Refresh ¶
func (r *ReplicationSlotAdvanceFromCTS) Refresh(_ context.Context) error
func (*ReplicationSlotAdvanceFromCTS) Status ¶
func (r *ReplicationSlotAdvanceFromCTS) Status() resource.Status
type ReplicationSlotCreate ¶
type ReplicationSlotCreate struct {
// contains filtered or unexported fields
}
ReplicationSlotCreate pre-creates a logical replication slot on a peer provider for a new subscriber. This is an ephemeral populate resource — Refresh always returns not-exists so it re-executes on every run.
func NewReplicationSlotCreate ¶
func NewReplicationSlotCreate(providerName, subscriberName, dbName string, conn *pgxpool.Pool) *ReplicationSlotCreate
func (*ReplicationSlotCreate) Create ¶
func (r *ReplicationSlotCreate) Create(ctx context.Context) error
func (*ReplicationSlotCreate) Delete ¶
func (r *ReplicationSlotCreate) Delete(_ context.Context) error
Delete is a no-op — the end-state ReplicationSlot resource manages slot lifecycle.
func (*ReplicationSlotCreate) Dependencies ¶
func (r *ReplicationSlotCreate) Dependencies() []resource.Identifier
func (*ReplicationSlotCreate) Identifier ¶
func (r *ReplicationSlotCreate) Identifier() resource.Identifier
func (*ReplicationSlotCreate) Refresh ¶
func (r *ReplicationSlotCreate) Refresh(_ context.Context) error
Refresh always returns not-exists — ephemeral resource re-executes every run.
func (*ReplicationSlotCreate) Status ¶
func (r *ReplicationSlotCreate) Status() resource.Status
type SpockNode ¶
type SpockNode struct {
// contains filtered or unexported fields
}
SpockNode manages a Spock node on a PostgreSQL instance.
func NewSpockNode ¶
func (*SpockNode) Dependencies ¶
func (n *SpockNode) Dependencies() []resource.Identifier
func (*SpockNode) Identifier ¶
func (n *SpockNode) Identifier() resource.Identifier
type SpockReconciler ¶
type SpockReconciler struct {
// contains filtered or unexported fields
}
SpockReconciler implements resource.Reconciler for Spock replication resources.
func NewReconciler ¶
NewReconciler creates a SpockReconciler from config and database connections.
func (*SpockReconciler) ComputeDesired ¶
func (r *SpockReconciler) ComputeDesired() map[resource.Identifier]resource.Resource
func (*SpockReconciler) RefreshActual ¶
func (r *SpockReconciler) RefreshActual(ctx context.Context, desired map[resource.Identifier]resource.Resource) (map[resource.Identifier]resource.Resource, error)
type Subscription ¶
type Subscription struct {
// contains filtered or unexported fields
}
Subscription manages a Spock subscription between two nodes. Executes on the subscriber (dst) node's connection.
func NewSubscription ¶
func NewSubscription(src, dst config.Node, dbName, pgedgeUser string, sync bool, conn *pgxpool.Pool, extraDeps ...resource.Identifier) *Subscription
func (*Subscription) Dependencies ¶
func (s *Subscription) Dependencies() []resource.Identifier
func (*Subscription) Identifier ¶
func (s *Subscription) Identifier() resource.Identifier
func (*Subscription) Status ¶
func (s *Subscription) Status() resource.Status
type SyncEvent ¶
type SyncEvent struct {
LSN string // populated during Create
// contains filtered or unexported fields
}
SyncEvent inserts a WAL bookmark on a provider node via spock.sync_event(). The returned LSN is stored for the paired WaitForSyncEvent to read. Ephemeral resource — re-executes every run.
func NewSyncEvent ¶
func (*SyncEvent) Dependencies ¶
func (r *SyncEvent) Dependencies() []resource.Identifier
func (*SyncEvent) Identifier ¶
func (r *SyncEvent) Identifier() resource.Identifier
type WaitForSyncEvent ¶
type WaitForSyncEvent struct {
// contains filtered or unexported fields
}
WaitForSyncEvent polls on the subscriber until it receives the sync event from the provider. Reads the LSN from the paired SyncEvent via struct pointer. Ephemeral resource — re-executes every run.
func NewWaitForSyncEvent ¶
func NewWaitForSyncEvent(providerName, subscriberName string, syncEvent *SyncEvent, conn *pgxpool.Pool) *WaitForSyncEvent
func (*WaitForSyncEvent) Dependencies ¶
func (r *WaitForSyncEvent) Dependencies() []resource.Identifier
func (*WaitForSyncEvent) Identifier ¶
func (r *WaitForSyncEvent) Identifier() resource.Identifier
func (*WaitForSyncEvent) Status ¶
func (r *WaitForSyncEvent) Status() resource.Status