cdc

package
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 29, 2026 License: MIT Imports: 15 Imported by: 0

Documentation

Overview

Package cdc implements Change Data Capture modules, steps, and triggers.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewBackpressureStep

func NewBackpressureStep(name string, _ map[string]any) (sdk.StepInstance, error)

NewBackpressureStep creates a new step.cdc_backpressure instance.

func NewMonitorStep

func NewMonitorStep(name string, _ map[string]any) (sdk.StepInstance, error)

NewMonitorStep creates a new step.cdc_monitor instance.

func NewSchemaHistoryStep

func NewSchemaHistoryStep(name string, _ map[string]any) (sdk.StepInstance, error)

NewSchemaHistoryStep creates a new step.cdc_schema_history instance.

func NewSnapshotStep

func NewSnapshotStep(name string, _ map[string]any) (sdk.StepInstance, error)

NewSnapshotStep creates a new step.cdc_snapshot instance.

func NewSourceModule

func NewSourceModule(name string, config map[string]any) (sdk.ModuleInstance, error)

NewSourceModule creates a new CDC source module.

func NewStartStep

func NewStartStep(name string, _ map[string]any) (sdk.StepInstance, error)

NewStartStep creates a new step.cdc_start instance.

func NewStatusStep

func NewStatusStep(name string, _ map[string]any) (sdk.StepInstance, error)

NewStatusStep creates a new step.cdc_status instance.

func NewStopStep

func NewStopStep(name string, _ map[string]any) (sdk.StepInstance, error)

NewStopStep creates a new step.cdc_stop instance.

func NewTrigger

func NewTrigger(config map[string]any, cb sdk.TriggerCallback) (sdk.TriggerInstance, error)

NewTrigger creates a new CDC trigger instance.

func RegisterSource

func RegisterSource(sourceID string, p CDCProvider) error

RegisterSource registers a CDC provider in the global registry.

func UnregisterSource

func UnregisterSource(sourceID string)

UnregisterSource removes a source from the global registry.

Types

type BackpressureMonitor

type BackpressureMonitor struct {
	ThresholdLagBytes   int64
	ThresholdLagSeconds int64
	WarningMultiplier   float64 // warn at this fraction of threshold (e.g. 0.8)
}

BackpressureMonitor evaluates CDC lag against configurable thresholds.

func (*BackpressureMonitor) Evaluate

func (m *BackpressureMonitor) Evaluate(sourceID string, lagBytes, lagSeconds int64) BackpressureStatus

Evaluate returns the backpressure status for the given lag values.

type BackpressureStatus

type BackpressureStatus struct {
	SourceID   string
	LagBytes   int64
	LagSeconds int64
	Status     string // "healthy", "warning", "critical"
}

BackpressureStatus holds the lag evaluation result.

type BentoProvider

type BentoProvider struct {
	// contains filtered or unexported fields
}

BentoProvider implements CDCProvider by generating Bento input YAML configs and delegating actual stream management to the workflow-plugin-bento engine module.

The data-engineering plugin no longer manages Bento stream lifecycle in-process. Instead, the engine config declares a bento.input module (referenced via options.bento_module) alongside the cdc.source module. The Bento module handles stream execution and publishes events to the engine's EventBus, which the cdc trigger consumes.

func (*BentoProvider) ConfigYAML

func (p *BentoProvider) ConfigYAML(sourceID string) (string, error)

ConfigYAML returns the generated Bento input YAML for a configured CDC source. This can be used to inspect what config was generated for the delegated module.

func (*BentoProvider) Connect

func (p *BentoProvider) Connect(_ context.Context, config SourceConfig) error

Connect generates the Bento CDC input YAML and stores it for delegation. Actual stream management is handled by the bento.input engine module referenced in config.Options["bento_module"].

func (*BentoProvider) Disconnect

func (p *BentoProvider) Disconnect(_ context.Context, sourceID string) error

Disconnect removes the CDC source configuration.

func (*BentoProvider) RegisterEventHandler

func (p *BentoProvider) RegisterEventHandler(sourceID string, h EventHandler) error

RegisterEventHandler stores the event handler for a CDC source. Events are delivered by the delegated bento.input engine module via the engine's EventBus.

func (*BentoProvider) SchemaHistory

func (p *BentoProvider) SchemaHistory(_ context.Context, sourceID string, _ string) ([]SchemaVersion, error)

SchemaHistory returns schema change history. Bento streams do not track DDL history natively; this always returns empty.

func (*BentoProvider) Snapshot

func (p *BentoProvider) Snapshot(_ context.Context, sourceID string, tables []string) error

Snapshot regenerates the CDC config for the given source, optionally overriding the table list. The updated YAML is available for the delegated bento.input module.

func (*BentoProvider) Status

func (p *BentoProvider) Status(_ context.Context, sourceID string) (*CDCStatus, error)

Status returns the current status of a CDC source.

type CDCProvider

type CDCProvider interface {
	// Connect establishes a connection and starts the CDC stream.
	Connect(ctx context.Context, config SourceConfig) error
	// Disconnect stops the CDC stream and releases resources.
	Disconnect(ctx context.Context, sourceID string) error
	// Status returns the current status of a CDC stream.
	Status(ctx context.Context, sourceID string) (*CDCStatus, error)
	// Snapshot triggers a full table snapshot for the given tables.
	Snapshot(ctx context.Context, sourceID string, tables []string) error
	// SchemaHistory returns the schema change history for a table.
	SchemaHistory(ctx context.Context, sourceID string, table string) ([]SchemaVersion, error)
	// RegisterEventHandler registers a callback for CDC events from a source stream.
	RegisterEventHandler(sourceID string, h EventHandler) error
}

CDCProvider defines the interface for Change Data Capture providers. Implementations: BentoProvider, DebeziumProvider, DMSProvider, MemoryProvider.

func LookupSource

func LookupSource(sourceID string) (CDCProvider, error)

LookupSource finds a running CDC source provider by ID.

type CDCStatus

type CDCStatus struct {
	SourceID   string `json:"source_id"              yaml:"source_id"`
	State      string `json:"state"                  yaml:"state"`
	Provider   string `json:"provider"               yaml:"provider"`
	LastEvent  string `json:"last_event"             yaml:"last_event"`
	Error      string `json:"error,omitempty"        yaml:"error,omitempty"`
	LagBytes   int64  `json:"lag_bytes,omitempty"    yaml:"lag_bytes,omitempty"`
	LagSeconds int64  `json:"lag_seconds,omitempty"  yaml:"lag_seconds,omitempty"`
}

CDCStatus describes the current state of a CDC stream.

type DMSClient

type DMSClient interface {
	CreateReplicationTask(ctx context.Context, params *dms.CreateReplicationTaskInput, optFns ...func(*dms.Options)) (*dms.CreateReplicationTaskOutput, error)
	StartReplicationTask(ctx context.Context, params *dms.StartReplicationTaskInput, optFns ...func(*dms.Options)) (*dms.StartReplicationTaskOutput, error)
	StopReplicationTask(ctx context.Context, params *dms.StopReplicationTaskInput, optFns ...func(*dms.Options)) (*dms.StopReplicationTaskOutput, error)
	DescribeReplicationTasks(ctx context.Context, params *dms.DescribeReplicationTasksInput, optFns ...func(*dms.Options)) (*dms.DescribeReplicationTasksOutput, error)
	DeleteReplicationTask(ctx context.Context, params *dms.DeleteReplicationTaskInput, optFns ...func(*dms.Options)) (*dms.DeleteReplicationTaskOutput, error)
	DescribeTableStatistics(ctx context.Context, params *dms.DescribeTableStatisticsInput, optFns ...func(*dms.Options)) (*dms.DescribeTableStatisticsOutput, error)
}

DMSClient is the interface over AWS DMS SDK calls, enabling mock injection in tests.

type DMSConfig

type DMSConfig struct {
	SourceEndpointARN      string `json:"source_endpoint_arn"      yaml:"source_endpoint_arn"`
	TargetEndpointARN      string `json:"target_endpoint_arn"      yaml:"target_endpoint_arn"`
	ReplicationInstanceARN string `json:"replication_instance_arn" yaml:"replication_instance_arn"`
	MigrationType          string `json:"migration_type"           yaml:"migration_type"` // cdc, full-load-and-cdc
}

DMSConfig holds DMS-specific configuration for the provider.

type DMSProvider

type DMSProvider struct {
	// contains filtered or unexported fields
}

DMSProvider implements CDCProvider using AWS Database Migration Service. It creates and manages AWS DMS replication tasks via the AWS SDK. The SourceConfig.Connection field is not used; DMS config comes from DMSConfig.

func (*DMSProvider) Connect

func (p *DMSProvider) Connect(ctx context.Context, config SourceConfig) error

Connect creates an AWS DMS replication task and starts it.

func (*DMSProvider) Disconnect

func (p *DMSProvider) Disconnect(ctx context.Context, sourceID string) error

Disconnect stops and deletes the AWS DMS replication task.

func (*DMSProvider) PauseSource

func (p *DMSProvider) PauseSource(ctx context.Context, sourceID string) error

PauseSource stops the DMS replication task (pause = stop for DMS).

func (*DMSProvider) RegisterEventHandler

func (p *DMSProvider) RegisterEventHandler(sourceID string, h EventHandler) error

RegisterEventHandler registers a callback for CDC events from an AWS DMS task.

func (*DMSProvider) ResumeSource

func (p *DMSProvider) ResumeSource(ctx context.Context, sourceID string) error

ResumeSource restarts the DMS replication task from where it stopped.

func (*DMSProvider) SchemaHistory

func (p *DMSProvider) SchemaHistory(ctx context.Context, sourceID string, table string) ([]SchemaVersion, error)

SchemaHistory returns schema change stats from AWS DMS table statistics. DMS tracks DDL counts per table via DescribeTableStatistics.

func (*DMSProvider) Snapshot

func (p *DMSProvider) Snapshot(ctx context.Context, sourceID string, tables []string) error

Snapshot triggers a full re-snapshot via reload-target: stops the task then restarts with reload-target migration type.

func (*DMSProvider) Status

func (p *DMSProvider) Status(ctx context.Context, sourceID string) (*CDCStatus, error)

Status returns the current status of an AWS DMS replication task.

type DebeziumProvider

type DebeziumProvider struct {
	// contains filtered or unexported fields
}

DebeziumProvider implements CDCProvider via the Kafka Connect REST API. It creates and manages Debezium connectors on an external Kafka Connect cluster. The SourceConfig.Connection field must be the base URL of the Kafka Connect REST API (e.g. "http://localhost:8083").

func (*DebeziumProvider) Connect

func (p *DebeziumProvider) Connect(ctx context.Context, config SourceConfig) error

Connect creates a Debezium connector via POST /connectors on the Kafka Connect API. config.Connection must be the Kafka Connect base URL.

func (*DebeziumProvider) Disconnect

func (p *DebeziumProvider) Disconnect(ctx context.Context, sourceID string) error

Disconnect deletes the Debezium connector via DELETE /connectors/{name}.

func (*DebeziumProvider) PauseSource

func (p *DebeziumProvider) PauseSource(ctx context.Context, sourceID string) error

PauseSource pauses a Debezium connector via PUT /connectors/{name}/pause.

func (*DebeziumProvider) RegisterEventHandler

func (p *DebeziumProvider) RegisterEventHandler(sourceID string, h EventHandler) error

RegisterEventHandler registers a callback for CDC events from a Debezium connector.

func (*DebeziumProvider) ResumeSource

func (p *DebeziumProvider) ResumeSource(ctx context.Context, sourceID string) error

ResumeSource resumes a paused Debezium connector via PUT /connectors/{name}/resume.

func (*DebeziumProvider) SchemaHistory

func (p *DebeziumProvider) SchemaHistory(ctx context.Context, sourceID string, table string) ([]SchemaVersion, error)

SchemaHistory returns schema change history for a table. NOTE: The Kafka Connect REST API does not expose schema history directly. A full implementation would require consuming the Debezium schema history Kafka topic. This implementation verifies the connector is running and returns an empty history.

func (*DebeziumProvider) Snapshot

func (p *DebeziumProvider) Snapshot(ctx context.Context, sourceID string, tables []string) error

Snapshot triggers a full re-snapshot by updating the connector's snapshot.mode and restarting it via PUT /connectors/{name}/config + POST /connectors/{name}/restart.

func (*DebeziumProvider) Status

func (p *DebeziumProvider) Status(ctx context.Context, sourceID string) (*CDCStatus, error)

Status returns the current connector state from GET /connectors/{name}/status.

type EventHandler

type EventHandler func(sourceID string, event map[string]any) error

EventHandler is called for each CDC event received from the provider. Implementations must be goroutine-safe.

type MemoryProvider

type MemoryProvider struct {
	// contains filtered or unexported fields
}

MemoryProvider is an in-memory CDCProvider implementation intended for testing. It allows injecting synthetic CDC events via InjectEvent and verifying provider behavior without real database connections or external services.

func NewMemoryProvider

func NewMemoryProvider() *MemoryProvider

NewMemoryProvider creates a new MemoryProvider.

func (*MemoryProvider) AddSchemaVersion

func (p *MemoryProvider) AddSchemaVersion(sourceID string, sv SchemaVersion) error

AddSchemaVersion records a synthetic DDL change event for testing schema history.

func (*MemoryProvider) Connect

func (p *MemoryProvider) Connect(ctx context.Context, config SourceConfig) error

Connect registers an in-memory CDC source and starts its event dispatch goroutine.

func (*MemoryProvider) Disconnect

func (p *MemoryProvider) Disconnect(ctx context.Context, sourceID string) error

Disconnect stops the event dispatch goroutine and removes the source.

func (*MemoryProvider) InjectEvent

func (p *MemoryProvider) InjectEvent(sourceID string, event map[string]any) error

InjectEvent injects a synthetic CDC event into the named source's event stream. This is the primary API used by tests to simulate database change events.

func (*MemoryProvider) RegisterEventHandler

func (p *MemoryProvider) RegisterEventHandler(sourceID string, h EventHandler) error

RegisterEventHandler registers a callback for events from a named source.

func (*MemoryProvider) SchemaHistory

func (p *MemoryProvider) SchemaHistory(_ context.Context, sourceID string, table string) ([]SchemaVersion, error)

SchemaHistory returns the recorded DDL change history for a table.

func (*MemoryProvider) SetLag

func (p *MemoryProvider) SetLag(sourceID string, lagBytes, lagSeconds int64) error

SetLag configures simulated CDC lag on a running memory source (for backpressure tests).

func (*MemoryProvider) Snapshot

func (p *MemoryProvider) Snapshot(_ context.Context, sourceID string, tables []string) error

Snapshot triggers a synthetic snapshot by emitting a snapshot_started event.

func (*MemoryProvider) Status

func (p *MemoryProvider) Status(_ context.Context, sourceID string) (*CDCStatus, error)

Status returns the current status of a memory CDC source.

type SchemaVersion

type SchemaVersion struct {
	Table     string `json:"table"      yaml:"table"`
	Version   int64  `json:"version"    yaml:"version"`
	DDL       string `json:"ddl"        yaml:"ddl"`
	AppliedAt string `json:"applied_at" yaml:"applied_at"`
}

SchemaVersion describes a schema change event for a table.

type SourceConfig

type SourceConfig struct {
	Provider   string         `json:"provider"    yaml:"provider"`
	SourceID   string         `json:"source_id"   yaml:"source_id"`
	SourceType string         `json:"source_type" yaml:"source_type"`
	Connection string         `json:"connection"  yaml:"connection"`
	Tables     []string       `json:"tables"      yaml:"tables"`
	Options    map[string]any `json:"options"     yaml:"options"`
}

SourceConfig holds configuration for the cdc.source module.

type SourceModule

type SourceModule struct {
	// contains filtered or unexported fields
}

SourceModule is a CDC source module that streams change events from a database.

func (*SourceModule) Config

func (m *SourceModule) Config() SourceConfig

Config returns the source configuration.

func (*SourceModule) Init

func (m *SourceModule) Init() error

Init validates the module configuration.

func (*SourceModule) Provider

func (m *SourceModule) Provider() CDCProvider

Provider returns the underlying CDC provider (used by steps).

func (*SourceModule) Start

func (m *SourceModule) Start(ctx context.Context) error

Start initializes the CDC provider connection and registers the module in the global registry. The module mutex is NOT held during provider.Connect to avoid blocking Provider() and Config() accessors during potentially long network round-trips. m.provider and m.config are immutable after NewSourceModule, so no lock is needed to read them.

func (*SourceModule) Stop

func (m *SourceModule) Stop(ctx context.Context) error

Stop shuts down the CDC provider connection and deregisters from the global registry. m.provider and m.config are immutable after NewSourceModule; no lock needed.

type ThrottleableProvider

type ThrottleableProvider interface {
	PauseSource(ctx context.Context, sourceID string) error
	ResumeSource(ctx context.Context, sourceID string) error
}

ThrottleableProvider is an optional extension of CDCProvider for providers that support pausing and resuming the CDC stream.

type Trigger

type Trigger struct {
	// contains filtered or unexported fields
}

Trigger implements sdk.TriggerInstance for trigger.cdc. It registers an EventHandler on the named cdc.source provider and fires the workflow callback for each matching CDC change event.

The trigger filters events by table name and action (INSERT/UPDATE/DELETE) when configured. If tables or actions are empty, all events are forwarded.

func (*Trigger) Start

func (t *Trigger) Start(ctx context.Context) error

Start registers an event handler on the running cdc.source module and begins forwarding matching CDC events to the workflow callback.

func (*Trigger) Stop

func (t *Trigger) Stop(ctx context.Context) error

Stop deregisters the event handler and halts event forwarding.

type TriggerConfig

type TriggerConfig struct {
	// SourceID references a running cdc.source module by source_id.
	SourceID string `json:"source_id" yaml:"source_id"`
	// Tables filters events to the given table names. Empty = all tables.
	Tables []string `json:"tables" yaml:"tables"`
	// Actions filters events by DML operation type (INSERT, UPDATE, DELETE). Empty = all.
	Actions []string `json:"actions" yaml:"actions"`
}

TriggerConfig holds configuration for the CDC trigger.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL