spock

package
v0.2.1-0...-1b331bb Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 17, 2026 License: PostgreSQL Imports: 11 Imported by: 0

Documentation

Overview

internal/spock/desired.go

internal/spock/disabled_subscription.go

internal/spock/lag_tracker_commit_ts.go

internal/spock/names.go

internal/spock/node.go

internal/spock/reconciler.go

internal/spock/refresh.go

internal/spock/replication_slot.go

internal/spock/replication_slot_advance.go

internal/spock/replication_slot_create.go

internal/spock/repset.go

internal/spock/reset.go

internal/spock/subscription.go

internal/spock/sync_event.go

internal/spock/user.go

internal/spock/wait_for_sync_event.go

Index

Constants

View Source
const (
	ResourceTypeUser                          = "spock.user"
	ResourceTypeNode                          = "spock.node"
	ResourceTypeReplicationSlot               = "spock.replicationslot"
	ResourceTypeSubscription                  = "spock.subscription"
	ResourceTypeReplicationSlotCreate         = "spock.replication_slot_create"
	ResourceTypeSyncEvent                     = "spock.sync_event"
	ResourceTypeWaitForSyncEvent              = "spock.wait_for_sync_event"
	ResourceTypeLagTrackerCommitTS            = "spock.lag_tracker_commit_ts"
	ResourceTypeReplicationSlotAdvanceFromCTS = "spock.replication_slot_advance_from_cts"
	ResourceTypeDisabledSubscription          = "spock.disabled_subscription"
)

Resource type constants for Spock resources.

Variables

This section is empty.

Functions

func ComputeDesired

func ComputeDesired(cfg *config.Config, conns map[string]*pgxpool.Pool) map[resource.Identifier]resource.Resource

ComputeDesired builds the full resource graph from node config.

func RefreshActual

func RefreshActual(
	ctx context.Context,
	cfg *config.Config,
	conns map[string]*pgxpool.Pool,
	desired map[resource.Identifier]resource.Resource,
) (map[resource.Identifier]resource.Resource, error)

RefreshActual refreshes all resources in the desired set to populate their Status, discovers orphan nodes, subscriptions, and replication slots from PostgreSQL catalogs, then cross-references subscriptions with their provider-side slots. Returns the combined "actual" map of everything that exists.

func ResetBootstrappedNodes

func ResetBootstrappedNodes(ctx context.Context, cfg *config.Config, conns map[string]*pgxpool.Pool) error

ResetBootstrappedNodes resets Spock on nodes bootstrapped via CNPG restore. These nodes have stale spock catalog state from the backup source.

func ResetSpock

func ResetSpock(ctx context.Context, cfg *config.Config, conns map[string]*pgxpool.Pool) error

ResetSpock drops and recreates Spock on every node connection. This follows the Control Plane pattern: backup repsets, nuke spock, reinitialize with correct config, restore repsets.

Types

type DisabledSubscription

type DisabledSubscription struct {
	// contains filtered or unexported fields
}

DisabledSubscription creates a disabled subscription from a peer provider to the new node during populate. This registers the subscription in Spock metadata early so the lag tracker can populate as data flows. The end-state Subscription resource later detects the disabled state and enables it via Update. Ephemeral resource — re-executes every run.

func NewDisabledSubscription

func NewDisabledSubscription(src, dst config.Node, dbName, pgedgeUser string, conn *pgxpool.Pool) *DisabledSubscription

func (*DisabledSubscription) Create

func (s *DisabledSubscription) Create(ctx context.Context) error

func (*DisabledSubscription) Delete

func (*DisabledSubscription) Dependencies

func (s *DisabledSubscription) Dependencies() []resource.Identifier

func (*DisabledSubscription) Identifier

func (s *DisabledSubscription) Identifier() resource.Identifier

func (*DisabledSubscription) Refresh

func (s *DisabledSubscription) Refresh(_ context.Context) error

Refresh always returns not-exists — ephemeral resource re-executes every run.

func (*DisabledSubscription) Status

func (s *DisabledSubscription) Status() resource.Status

func (*DisabledSubscription) Update

type LagTrackerCommitTimestamp

type LagTrackerCommitTimestamp struct {
	CommitTS *time.Time // populated during Create
	// contains filtered or unexported fields
}

LagTrackerCommitTimestamp reads the commit timestamp from spock.lag_tracker on the new node. Captures how far peer data has been replicated. Ephemeral resource — re-executes every run.

func NewLagTrackerCommitTimestamp

func NewLagTrackerCommitTimestamp(originName, receiverName string, conn *pgxpool.Pool, extraDeps ...resource.Identifier) *LagTrackerCommitTimestamp

func (*LagTrackerCommitTimestamp) Create

func (*LagTrackerCommitTimestamp) Delete

func (*LagTrackerCommitTimestamp) Dependencies

func (r *LagTrackerCommitTimestamp) Dependencies() []resource.Identifier

func (*LagTrackerCommitTimestamp) Identifier

func (*LagTrackerCommitTimestamp) Refresh

func (*LagTrackerCommitTimestamp) Status

func (*LagTrackerCommitTimestamp) Update

type PgEdgeUser

type PgEdgeUser struct {
	// contains filtered or unexported fields
}

PgEdgeUser ensures the pgedge replication role exists on a node.

func NewPgEdgeUser

func NewPgEdgeUser(node config.Node, dbName, pgedgeUser string, conn *pgxpool.Pool) *PgEdgeUser

func (*PgEdgeUser) Create

func (u *PgEdgeUser) Create(ctx context.Context) error

func (*PgEdgeUser) Delete

func (u *PgEdgeUser) Delete(ctx context.Context) error

func (*PgEdgeUser) Dependencies

func (u *PgEdgeUser) Dependencies() []resource.Identifier

func (*PgEdgeUser) Identifier

func (u *PgEdgeUser) Identifier() resource.Identifier

func (*PgEdgeUser) Refresh

func (u *PgEdgeUser) Refresh(ctx context.Context) error

func (*PgEdgeUser) Status

func (u *PgEdgeUser) Status() resource.Status

func (*PgEdgeUser) Update

func (u *PgEdgeUser) Update(_ context.Context) error

type ReplicationSlot

type ReplicationSlot struct {
	// contains filtered or unexported fields
}

ReplicationSlot manages a provider-side logical replication slot. Executes on the provider node's connection.

func NewReplicationSlot

func NewReplicationSlot(providerName, subscriberName, dbName string, conn *pgxpool.Pool) *ReplicationSlot

func (*ReplicationSlot) Create

func (r *ReplicationSlot) Create(ctx context.Context) error

Create is a no-op — Spock's sub_create creates the slot automatically.

func (*ReplicationSlot) Delete

func (r *ReplicationSlot) Delete(ctx context.Context) error

Delete terminates any active walsender and drops the replication slot.

func (*ReplicationSlot) Dependencies

func (r *ReplicationSlot) Dependencies() []resource.Identifier

func (*ReplicationSlot) Identifier

func (r *ReplicationSlot) Identifier() resource.Identifier

func (*ReplicationSlot) Refresh

func (r *ReplicationSlot) Refresh(ctx context.Context) error

func (*ReplicationSlot) Status

func (r *ReplicationSlot) Status() resource.Status

func (*ReplicationSlot) Update

func (r *ReplicationSlot) Update(_ context.Context) error

type ReplicationSlotAdvanceFromCTS

type ReplicationSlotAdvanceFromCTS struct {
	// contains filtered or unexported fields
}

ReplicationSlotAdvanceFromCTS advances a peer's replication slot to skip WAL already synced via the source. Reads the commit timestamp from the paired LagTrackerCommitTimestamp via struct pointer. Ephemeral resource — re-executes every run.

func NewReplicationSlotAdvanceFromCTS

func NewReplicationSlotAdvanceFromCTS(providerName, subscriberName, dbName string, lagTracker *LagTrackerCommitTimestamp, conn *pgxpool.Pool) *ReplicationSlotAdvanceFromCTS

func (*ReplicationSlotAdvanceFromCTS) Create

func (*ReplicationSlotAdvanceFromCTS) Delete

func (*ReplicationSlotAdvanceFromCTS) Dependencies

func (*ReplicationSlotAdvanceFromCTS) Identifier

func (*ReplicationSlotAdvanceFromCTS) Refresh

func (*ReplicationSlotAdvanceFromCTS) Status

func (*ReplicationSlotAdvanceFromCTS) Update

type ReplicationSlotCreate

type ReplicationSlotCreate struct {
	// contains filtered or unexported fields
}

ReplicationSlotCreate pre-creates a logical replication slot on a peer provider for a new subscriber. This is an ephemeral populate resource — Refresh always returns not-exists so it re-executes on every run.

func NewReplicationSlotCreate

func NewReplicationSlotCreate(providerName, subscriberName, dbName string, conn *pgxpool.Pool) *ReplicationSlotCreate

func (*ReplicationSlotCreate) Create

func (r *ReplicationSlotCreate) Create(ctx context.Context) error

func (*ReplicationSlotCreate) Delete

Delete is a no-op — the end-state ReplicationSlot resource manages slot lifecycle.

func (*ReplicationSlotCreate) Dependencies

func (r *ReplicationSlotCreate) Dependencies() []resource.Identifier

func (*ReplicationSlotCreate) Identifier

func (r *ReplicationSlotCreate) Identifier() resource.Identifier

func (*ReplicationSlotCreate) Refresh

Refresh always returns not-exists — ephemeral resource re-executes every run.

func (*ReplicationSlotCreate) Status

func (*ReplicationSlotCreate) Update

type SpockNode

type SpockNode struct {
	// contains filtered or unexported fields
}

SpockNode manages a Spock node on a PostgreSQL instance.

func NewSpockNode

func NewSpockNode(node config.Node, dbName, pgedgeUser string, conn *pgxpool.Pool) *SpockNode

func (*SpockNode) Create

func (n *SpockNode) Create(ctx context.Context) error

func (*SpockNode) Delete

func (n *SpockNode) Delete(ctx context.Context) error

func (*SpockNode) Dependencies

func (n *SpockNode) Dependencies() []resource.Identifier

func (*SpockNode) Identifier

func (n *SpockNode) Identifier() resource.Identifier

func (*SpockNode) Refresh

func (n *SpockNode) Refresh(ctx context.Context) error

func (*SpockNode) Status

func (n *SpockNode) Status() resource.Status

func (*SpockNode) Update

func (n *SpockNode) Update(_ context.Context) error

type SpockReconciler

type SpockReconciler struct {
	// contains filtered or unexported fields
}

SpockReconciler implements resource.Reconciler for Spock replication resources.

func NewReconciler

func NewReconciler(cfg *config.Config, conns map[string]*pgxpool.Pool) *SpockReconciler

NewReconciler creates a SpockReconciler from config and database connections.

func (*SpockReconciler) ComputeDesired

func (r *SpockReconciler) ComputeDesired() map[resource.Identifier]resource.Resource

func (*SpockReconciler) RefreshActual

type Subscription

type Subscription struct {
	// contains filtered or unexported fields
}

Subscription manages a Spock subscription between two nodes. Executes on the subscriber (dst) node's connection.

func NewSubscription

func NewSubscription(src, dst config.Node, dbName, pgedgeUser string, sync bool, conn *pgxpool.Pool, extraDeps ...resource.Identifier) *Subscription

func (*Subscription) Create

func (s *Subscription) Create(ctx context.Context) error

func (*Subscription) Delete

func (s *Subscription) Delete(ctx context.Context) error

func (*Subscription) Dependencies

func (s *Subscription) Dependencies() []resource.Identifier

func (*Subscription) Identifier

func (s *Subscription) Identifier() resource.Identifier

func (*Subscription) Refresh

func (s *Subscription) Refresh(ctx context.Context) error

func (*Subscription) Status

func (s *Subscription) Status() resource.Status

func (*Subscription) Update

func (s *Subscription) Update(ctx context.Context) error

type SyncEvent

type SyncEvent struct {
	LSN string // populated during Create
	// contains filtered or unexported fields
}

SyncEvent inserts a WAL bookmark on a provider node via spock.sync_event(). The returned LSN is stored for the paired WaitForSyncEvent to read. Ephemeral resource — re-executes every run.

func NewSyncEvent

func NewSyncEvent(providerName, subscriberName string, conn *pgxpool.Pool, extraDeps ...resource.Identifier) *SyncEvent

func (*SyncEvent) Create

func (r *SyncEvent) Create(ctx context.Context) error

func (*SyncEvent) Delete

func (r *SyncEvent) Delete(_ context.Context) error

func (*SyncEvent) Dependencies

func (r *SyncEvent) Dependencies() []resource.Identifier

func (*SyncEvent) Identifier

func (r *SyncEvent) Identifier() resource.Identifier

func (*SyncEvent) Refresh

func (r *SyncEvent) Refresh(_ context.Context) error

func (*SyncEvent) Status

func (r *SyncEvent) Status() resource.Status

func (*SyncEvent) Update

func (r *SyncEvent) Update(_ context.Context) error

type WaitForSyncEvent

type WaitForSyncEvent struct {
	// contains filtered or unexported fields
}

WaitForSyncEvent polls on the subscriber until it receives the sync event from the provider. Reads the LSN from the paired SyncEvent via struct pointer. Ephemeral resource — re-executes every run.

func NewWaitForSyncEvent

func NewWaitForSyncEvent(providerName, subscriberName string, syncEvent *SyncEvent, conn *pgxpool.Pool) *WaitForSyncEvent

func (*WaitForSyncEvent) Create

func (r *WaitForSyncEvent) Create(ctx context.Context) error

func (*WaitForSyncEvent) Delete

func (r *WaitForSyncEvent) Delete(_ context.Context) error

func (*WaitForSyncEvent) Dependencies

func (r *WaitForSyncEvent) Dependencies() []resource.Identifier

func (*WaitForSyncEvent) Identifier

func (r *WaitForSyncEvent) Identifier() resource.Identifier

func (*WaitForSyncEvent) Refresh

func (r *WaitForSyncEvent) Refresh(_ context.Context) error

func (*WaitForSyncEvent) Status

func (r *WaitForSyncEvent) Status() resource.Status

func (*WaitForSyncEvent) Update

func (r *WaitForSyncEvent) Update(_ context.Context) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL