Documentation
¶
Index ¶
- Constants
- Variables
- func LatestSnapshot(ctx context.Context, dsn string, options ...Option) (sequence uint64, reader io.ReadCloser, err error)
- func Shutdown()
- type AsyncNATSPublisher
- type BackupFn
- type CDCPublisher
- type CDCSubscriber
- type Change
- type ChangeSet
- func (cs *ChangeSet) AddChange(change Change)
- func (cs *ChangeSet) Apply(db *sql.DB) (err error)
- func (cs *ChangeSet) Clear()
- func (cs *ChangeSet) Send(pub CDCPublisher) error
- func (cs *ChangeSet) SetApplyStrategy(fn applyStrategyFn)
- func (cs *ChangeSet) SetConnProvider(connProvider ConnHooksProvider)
- func (cs *ChangeSet) SetInterceptor(interceptor ChangeSetInterceptor)
- type ChangeSetInterceptor
- type ChangeSetSerializer
- type ConnHooksFactory
- type ConnHooksProvider
- type Connector
- func (c *Connector) Close()
- func (c *Connector) Connect(ctx context.Context) (driver.Conn, error)
- func (c *Connector) DeliveredInfo(ctx context.Context, name string) (any, error)
- func (c *Connector) Driver() driver.Driver
- func (c *Connector) LatestSeq() uint64
- func (c *Connector) LatestSnapshot(ctx context.Context) (uint64, io.ReadCloser, error)
- func (c *Connector) NodeName() string
- func (c *Connector) Publisher() CDCPublisher
- func (c *Connector) RemoveConsumer(ctx context.Context, name string) error
- func (c *Connector) Snapshotter() DBSnapshotter
- func (c *Connector) Subscriber() CDCSubscriber
- func (c *Connector) TakeSnapshot(ctx context.Context) (sequence uint64, err error)
- type DBSnapshotter
- type DriverProvider
- type EmbeddedNatsConfig
- type JSONPublisher
- type NATSPublisher
- type NATSSnapshotter
- func (s *NATSSnapshotter) LatestSnapshot(ctx context.Context) (uint64, io.ReadCloser, error)
- func (s *NATSSnapshotter) LatestSnapshotSequence(ctx context.Context) (uint64, error)
- func (s *NATSSnapshotter) SetDB(db *sql.DB)
- func (s *NATSSnapshotter) Start()
- func (s *NATSSnapshotter) TakeSnapshot(ctx context.Context) (sequence uint64, err error)
- type NATSSubscriber
- type NATSSubscriberConfig
- type NoopPublisher
- type NoopSnapshotter
- type NoopSubscriber
- type Option
- func NameToOptions(name string) (string, []Option, error)
- func WithAsyncPublisher() Option
- func WithAsyncPublisherOutboxDir(dir string) Option
- func WithCDCID(id string) Option
- func WithCDCPublisher(pub CDCPublisher) Option
- func WithCDCSubscriber(sub CDCSubscriber) Option
- func WithChangeSetInterceptor(interceptor ChangeSetInterceptor) Option
- func WithDBSnapshotter(snap DBSnapshotter) Option
- func WithDeliverPolicy(deliverPolicy string) Option
- func WithDisableDDLSync() Option
- func WithEmbeddedNatsConfig(cfg *EmbeddedNatsConfig) Option
- func WithExtensions(extensions ...string) Option
- func WithName(name string) Option
- func WithNatsOptions(options ...nats.Option) Option
- func WithPublisherTimeout(timeout time.Duration) Option
- func WithReplicas(replicas int) Option
- func WithReplicationStream(stream string) Option
- func WithReplicationURL(url string) Option
- func WithRowIdentify(i RowIdentify) Option
- func WithSnapshotInterval(interval time.Duration) Option
- func WithStreamMaxAge(maxAge time.Duration) Option
- func WithWaitFor(ch chan struct{}) Option
- type RowIdentify
- type SequenceProvider
- type Statement
- func (s *Statement) Begin() bool
- func (s *Statement) Columns() []string
- func (s *Statement) Commit() bool
- func (s *Statement) DDL() bool
- func (s *Statement) HasDistinct() bool
- func (s *Statement) HasReturning() bool
- func (s *Statement) IsCreateTable() bool
- func (s *Statement) IsDelete() bool
- func (s *Statement) IsExplain() bool
- func (s *Statement) IsInsert() bool
- func (s *Statement) IsSelect() bool
- func (s *Statement) IsUpdate() bool
- func (s *Statement) ModifiesDatabase() bool
- func (s *Statement) Parameters() []string
- func (s *Statement) Rollback() bool
- func (s *Statement) Source() string
- func (s *Statement) SourceWithIfExists() string
- func (s *Statement) Type() string
- type WriterPublisher
Constants ¶
View Source
const ( TypeExplain = "EXPLAIN" TypeSelect = "SELECT" TypeInsert = "INSERT" TypeUpdate = "UPDATE" TypeDelete = "DELETE" TypeCreateTable = "CREATE TABLE" TypeCreateIndex = "CREATE INDEX" TypeCreateView = "CREATE VIEW" TypeCreateTrigger = "CREATE TRIGGER" TypeCreateVirtualTable = "CREATE VIRTUAL TABLE" TypeAlterTable = "ALTER TABLE" TypeVacuum = "VACUUM" TypeDrop = "DROP" TypeAnalyze = "ANALYZE" TypeBegin = "BEGIN" TypeCommit = "COMMIT" TypeRollback = "ROLLBACK" TypeSavepoint = "SAVEPOINT" TypeRelease = "RELEASE" TypeOther = "OTHER" )
View Source
const DefaultStream = "ha_replication"
Variables ¶
View Source
var ( ErrSubscriberNotConfigured = errors.New("subscriber not configured") ErrSnapshotterNotConfigured = errors.New("snapshotter not configured") )
View Source
var (
ErrInvalidSQL = fmt.Errorf("invalid SQL")
)
Functions ¶
func LatestSnapshot ¶ added in v0.0.4
Types ¶
type AsyncNATSPublisher ¶ added in v0.0.15
type AsyncNATSPublisher struct {
*NATSPublisher
// contains filtered or unexported fields
}
func NewAsyncNATSPublisher ¶ added in v0.0.15
func (*AsyncNATSPublisher) Close ¶ added in v0.0.15
func (p *AsyncNATSPublisher) Close() error
func (*AsyncNATSPublisher) Publish ¶ added in v0.0.15
func (p *AsyncNATSPublisher) Publish(cs *ChangeSet) error
type CDCPublisher ¶
type CDCSubscriber ¶ added in v0.0.6
type Change ¶
type Change struct {
Database string `json:"database,omitempty"`
Table string `json:"table,omitempty"`
Columns []string `json:"columns,omitempty"`
Operation string `json:"operation"` // "INSERT", "UPDATE", "DELETE", "SQL", "CUSTOM"
OldRowID int64 `json:"old_rowid,omitempty"`
NewRowID int64 `json:"new_rowid,omitempty"`
OldValues []any `json:"old_values,omitempty"`
NewValues []any `json:"new_values,omitempty"`
Command string `json:"command,omitempty"`
Args []any `json:"args,omitempty"`
}
type ChangeSet ¶
type ChangeSet struct {
Node string `json:"node"`
ProcessID int64 `json:"process_id"`
Filename string `json:"filename"`
Changes []Change `json:"changes"`
Timestamp int64 `json:"timestamp_ns"`
Subject string `json:"-"`
StreamSeq uint64 `json:"-"`
// contains filtered or unexported fields
}
func NewChangeSet ¶
func (*ChangeSet) Send ¶
func (cs *ChangeSet) Send(pub CDCPublisher) error
func (*ChangeSet) SetApplyStrategy ¶ added in v0.0.21
func (cs *ChangeSet) SetApplyStrategy(fn applyStrategyFn)
func (*ChangeSet) SetConnProvider ¶ added in v0.0.10
func (cs *ChangeSet) SetConnProvider(connProvider ConnHooksProvider)
func (*ChangeSet) SetInterceptor ¶ added in v0.0.6
func (cs *ChangeSet) SetInterceptor(interceptor ChangeSetInterceptor)
type ChangeSetInterceptor ¶ added in v0.0.6
type ChangeSetSerializer ¶ added in v0.0.6
type ConnHooksFactory ¶ added in v0.0.10
type ConnHooksFactory func(nodeName string, filename string, disableDDLSync bool, publisher CDCPublisher) ConnHooksProvider
type ConnHooksProvider ¶ added in v0.0.10
type Connector ¶
type Connector struct {
// contains filtered or unexported fields
}
func GetConnector ¶ added in v0.0.12
func NewConnector ¶
func (*Connector) DeliveredInfo ¶
func (*Connector) LatestSnapshot ¶
func (*Connector) Publisher ¶ added in v0.0.10
func (c *Connector) Publisher() CDCPublisher
func (*Connector) RemoveConsumer ¶
func (*Connector) Snapshotter ¶ added in v0.0.13
func (c *Connector) Snapshotter() DBSnapshotter
func (*Connector) Subscriber ¶ added in v0.0.13
func (c *Connector) Subscriber() CDCSubscriber
type DBSnapshotter ¶ added in v0.0.6
type DriverProvider ¶ added in v0.0.10
type EmbeddedNatsConfig ¶
type JSONPublisher ¶ added in v0.0.6
type JSONPublisher struct {
// contains filtered or unexported fields
}
func NewJSONPublisher ¶ added in v0.0.6
func NewJSONPublisher(w io.Writer) *JSONPublisher
func (*JSONPublisher) Publish ¶ added in v0.0.6
func (p *JSONPublisher) Publish(cs *ChangeSet) error
type NATSPublisher ¶ added in v0.0.6
type NATSPublisher struct {
// contains filtered or unexported fields
}
func NewNATSPublisher ¶ added in v0.0.6
func NewNATSPublisher(nc *nats.Conn, subject string, timeout time.Duration, streamConfig *jetstream.StreamConfig) (*NATSPublisher, error)
func (*NATSPublisher) Publish ¶ added in v0.0.6
func (p *NATSPublisher) Publish(cs *ChangeSet) error
type NATSSnapshotter ¶ added in v0.0.6
type NATSSnapshotter struct {
// contains filtered or unexported fields
}
func NewNATSSnapshotter ¶ added in v0.0.6
func (*NATSSnapshotter) LatestSnapshot ¶ added in v0.0.6
func (s *NATSSnapshotter) LatestSnapshot(ctx context.Context) (uint64, io.ReadCloser, error)
func (*NATSSnapshotter) LatestSnapshotSequence ¶ added in v0.0.6
func (s *NATSSnapshotter) LatestSnapshotSequence(ctx context.Context) (uint64, error)
func (*NATSSnapshotter) SetDB ¶ added in v0.0.13
func (s *NATSSnapshotter) SetDB(db *sql.DB)
func (*NATSSnapshotter) Start ¶ added in v0.0.13
func (s *NATSSnapshotter) Start()
func (*NATSSnapshotter) TakeSnapshot ¶ added in v0.0.6
func (s *NATSSnapshotter) TakeSnapshot(ctx context.Context) (sequence uint64, err error)
type NATSSubscriber ¶ added in v0.0.6
type NATSSubscriber struct {
// contains filtered or unexported fields
}
func NewNATSSubscriber ¶ added in v0.0.6
func NewNATSSubscriber(cfg NATSSubscriberConfig) (*NATSSubscriber, error)
func (*NATSSubscriber) DeliveredInfo ¶ added in v0.0.6
func (*NATSSubscriber) LatestSeq ¶ added in v0.0.6
func (s *NATSSubscriber) LatestSeq() uint64
func (*NATSSubscriber) RemoveConsumer ¶ added in v0.0.6
func (s *NATSSubscriber) RemoveConsumer(ctx context.Context, name string) error
func (*NATSSubscriber) SetDB ¶ added in v0.0.13
func (s *NATSSubscriber) SetDB(db *sql.DB)
func (*NATSSubscriber) Start ¶ added in v0.0.6
func (s *NATSSubscriber) Start() error
type NATSSubscriberConfig ¶ added in v0.0.21
type NATSSubscriberConfig struct {
Node string
Durable string
NatsConn *nats.Conn
Stream string
Subject string
Policy string
DB *sql.DB
ConnProvider ConnHooksProvider
Interceptor ChangeSetInterceptor
RowIdentify RowIdentify
}
type NoopPublisher ¶ added in v0.0.6
type NoopPublisher struct{}
func NewNoopPublisher ¶ added in v0.0.6
func NewNoopPublisher() *NoopPublisher
func (*NoopPublisher) Publish ¶ added in v0.0.6
func (p *NoopPublisher) Publish(cs *ChangeSet) error
type NoopSnapshotter ¶ added in v0.0.6
type NoopSnapshotter struct {
// contains filtered or unexported fields
}
func NewNoopSnapshotter ¶ added in v0.0.6
func NewNoopSnapshotter() *NoopSnapshotter
func (*NoopSnapshotter) LatestSnapshot ¶ added in v0.0.6
func (s *NoopSnapshotter) LatestSnapshot(ctx context.Context) (uint64, io.ReadCloser, error)
func (*NoopSnapshotter) SetDB ¶ added in v0.0.13
func (s *NoopSnapshotter) SetDB(_ *sql.DB)
func (*NoopSnapshotter) Start ¶ added in v0.0.13
func (s *NoopSnapshotter) Start()
func (*NoopSnapshotter) TakeSnapshot ¶ added in v0.0.6
func (s *NoopSnapshotter) TakeSnapshot(ctx context.Context) (sequence uint64, err error)
type NoopSubscriber ¶ added in v0.0.6
type NoopSubscriber struct{}
func NewNoopSubscriber ¶ added in v0.0.6
func NewNoopSubscriber() *NoopSubscriber
func (*NoopSubscriber) DeliveredInfo ¶ added in v0.0.6
func (*NoopSubscriber) LatestSeq ¶ added in v0.0.6
func (*NoopSubscriber) LatestSeq() uint64
func (*NoopSubscriber) RemoveConsumer ¶ added in v0.0.6
func (*NoopSubscriber) RemoveConsumer(ctx context.Context, name string) error
func (*NoopSubscriber) SetDB ¶ added in v0.0.13
func (*NoopSubscriber) SetDB(db *sql.DB)
func (*NoopSubscriber) Start ¶ added in v0.0.6
func (*NoopSubscriber) Start() error
type Option ¶
type Option func(*Connector)
func WithAsyncPublisher ¶ added in v0.0.15
func WithAsyncPublisher() Option
func WithAsyncPublisherOutboxDir ¶ added in v0.0.15
func WithCDCPublisher ¶
func WithCDCPublisher(pub CDCPublisher) Option
func WithCDCSubscriber ¶ added in v0.0.6
func WithCDCSubscriber(sub CDCSubscriber) Option
func WithChangeSetInterceptor ¶ added in v0.0.6
func WithChangeSetInterceptor(interceptor ChangeSetInterceptor) Option
func WithDBSnapshotter ¶ added in v0.0.6
func WithDBSnapshotter(snap DBSnapshotter) Option
func WithDeliverPolicy ¶
func WithDisableDDLSync ¶ added in v0.0.4
func WithDisableDDLSync() Option
func WithEmbeddedNatsConfig ¶
func WithEmbeddedNatsConfig(cfg *EmbeddedNatsConfig) Option
func WithExtensions ¶
func WithNatsOptions ¶
func WithPublisherTimeout ¶
func WithReplicas ¶
func WithReplicationStream ¶ added in v0.0.9
func WithReplicationURL ¶
func WithRowIdentify ¶ added in v0.0.21
func WithRowIdentify(i RowIdentify) Option
func WithSnapshotInterval ¶
func WithStreamMaxAge ¶
func WithWaitFor ¶ added in v0.0.4
func WithWaitFor(ch chan struct{}) Option
type RowIdentify ¶ added in v0.0.21
type RowIdentify string
const ( Rowid RowIdentify = "rowid" Full RowIdentify = "full" )
type SequenceProvider ¶
type SequenceProvider interface {
LatestSeq() uint64
}
type Statement ¶
type Statement struct {
// contains filtered or unexported fields
}
func ParseStatement ¶ added in v0.0.4
func UnverifiedStatement ¶ added in v0.0.12
func (*Statement) HasDistinct ¶
func (*Statement) HasReturning ¶
func (*Statement) IsCreateTable ¶
func (*Statement) ModifiesDatabase ¶ added in v0.0.12
func (*Statement) Parameters ¶
func (*Statement) SourceWithIfExists ¶ added in v0.0.10
type WriterPublisher ¶ added in v0.0.6
type WriterPublisher struct {
// contains filtered or unexported fields
}
func NewWriterPublisher ¶ added in v0.0.6
func NewWriterPublisher(w io.Writer, serializer ChangeSetSerializer) *WriterPublisher
func (*WriterPublisher) Publish ¶ added in v0.0.6
func (p *WriterPublisher) Publish(cs *ChangeSet) error
Source Files
¶
Click to show internal directories.
Click to hide internal directories.