Documentation
¶
Index ¶
- Constants
- Variables
- func ConnectHandler(opts ...connect.HandlerOption) (path string, handler http.Handler)
- func CrossShardQuery(ctx context.Context, stmt *Statement, args []driver.NamedValue, ...) (driver.Rows, error)
- func LatestSnapshot(ctx context.Context, dsn string, options ...Option) (sequence uint64, reader io.ReadCloser, err error)
- func ListDSN() []string
- func ListReplicationIDs() []string
- func Shutdown()
- type AsyncNATSPublisher
- type BackupFn
- type CDCPublisher
- type Change
- type ChangeSet
- func (cs *ChangeSet) AddChange(change Change)
- func (cs *ChangeSet) Apply(db *sql.DB) (err error)
- func (cs *ChangeSet) Clear()
- func (cs *ChangeSet) DebeziumData() []DebeziumData
- func (cs *ChangeSet) Send(pub Publisher) error
- func (cs *ChangeSet) SetApplyStrategy(fn applyStrategyFn)
- func (cs *ChangeSet) SetConnProvider(connProvider ConnHooksProvider)
- func (cs *ChangeSet) SetInterceptor(interceptor ChangeSetInterceptor)
- type ChangeSetInterceptor
- type ChangeSetSerializer
- type ConnHooksConfig
- type ConnHooksFactory
- type ConnHooksProvider
- type Connector
- func (c *Connector) Backup(ctx context.Context, writer io.Writer) error
- func (c *Connector) CDCPublisher() CDCPublisher
- func (c *Connector) Close()
- func (c *Connector) Connect(ctx context.Context) (driver.Conn, error)
- func (c *Connector) ConsistentReader(timeout time.Duration, methods ...string) func(http.Handler) http.Handler
- func (c *Connector) ConsistentReaderFunc(h http.HandlerFunc, timeout time.Duration, methods ...string) http.HandlerFunc
- func (c *Connector) DB() *sql.DB
- func (c *Connector) DeliveredInfo(ctx context.Context, name string) (any, error)
- func (c *Connector) Driver() driver.Driver
- func (c *Connector) ForwardToLeader(timeout time.Duration, methods ...string) func(http.Handler) http.Handler
- func (c *Connector) ForwardToLeaderFunc(h http.HandlerFunc, timeout time.Duration, methods ...string) http.HandlerFunc
- func (c *Connector) LatestSeq() uint64
- func (c *Connector) LatestSnapshot(ctx context.Context) (uint64, io.ReadCloser, error)
- func (c *Connector) LeaderProvider() LeaderProvider
- func (c *Connector) NodeName() string
- func (c *Connector) PubSeq() uint64
- func (c *Connector) Publisher() Publisher
- func (c *Connector) RemoveConsumer(ctx context.Context, name string) error
- func (c *Connector) ResponseWriter(w http.ResponseWriter) http.ResponseWriter
- func (c *Connector) Snapshotter() DBSnapshotter
- func (c *Connector) Start(db *sql.DB) error
- func (c *Connector) Subscriber() Subscriber
- func (c *Connector) TakeSnapshot(ctx context.Context) (sequence uint64, err error)
- type DBSnapshotter
- type DebeziumData
- type DebeziumPayload
- type DebeziumSource
- type DebeziumTransaction
- type DynamicLeader
- type EmbeddedNatsConfig
- type JSONPublisher
- type LeaderProvider
- type NATSPublisher
- type NATSSnapshotter
- func (s *NATSSnapshotter) LatestSnapshot(ctx context.Context) (uint64, io.ReadCloser, error)
- func (s *NATSSnapshotter) LatestSnapshotSequence(ctx context.Context) (uint64, error)
- func (s *NATSSnapshotter) SetDB(db *sql.DB)
- func (s *NATSSnapshotter) Start()
- func (s *NATSSnapshotter) TakeSnapshot(ctx context.Context) (sequence uint64, err error)
- type NATSSubscriber
- type NATSSubscriberConfig
- type NoopPublisher
- type NoopSnapshotter
- type NoopSubscriber
- type Option
- func NameToOptions(name string) (string, []Option, error)
- func WithAsyncPublisher() Option
- func WithAsyncPublisherOutboxDir(dir string) Option
- func WithAutoStart(enabled bool) Option
- func WithCDCPublisher(p CDCPublisher) Option
- func WithChangeSetInterceptor(interceptor ChangeSetInterceptor) Option
- func WithClusterSize(size int) Option
- func WithDBSnapshotter(snap DBSnapshotter) Option
- func WithDeliverPolicy(deliverPolicy string) Option
- func WithDisableDDLSync() Option
- func WithEmbeddedNatsConfig(cfg *EmbeddedNatsConfig) Option
- func WithExtensions(extensions ...string) Option
- func WithGrpcPort(port int) Option
- func WithGrpcTimeout(timeout time.Duration) Option
- func WithGrpcToken(token string) Option
- func WithLeaderElectionLocalTarget(localEndpoint string) Option
- func WithLeaderProvider(p LeaderProvider) Option
- func WithName(name string) Option
- func WithNatsOptions(options ...nats.Option) Option
- func WithPublisherTimeout(timeout time.Duration) Option
- func WithQueryRouter(re *regexp.Regexp) Option
- func WithReplicas(replicas int) Option
- func WithReplicationID(id string) Option
- func WithReplicationPublisher(pub Publisher) Option
- func WithReplicationStream(stream string) Option
- func WithReplicationSubscriber(sub Subscriber) Option
- func WithReplicationURL(url string) Option
- func WithRowIdentify(i RowIdentify) Option
- func WithSnapshotInterval(interval time.Duration) Option
- func WithStreamMaxAge(maxAge time.Duration) Option
- func WithWaitFor(ch chan struct{}) Option
- type Publisher
- type RowIdentify
- type SequenceProvider
- type Statement
- func (s *Statement) AggregateFunctions() map[int]*sql.Call
- func (s *Statement) Begin() bool
- func (s *Statement) Columns() []string
- func (s *Statement) Commit() bool
- func (s *Statement) DDL() bool
- func (s *Statement) HasDistinct() bool
- func (s *Statement) HasReturning() bool
- func (s *Statement) IsCreateTable() bool
- func (s *Statement) IsDelete() bool
- func (s *Statement) IsExplain() bool
- func (s *Statement) IsInsert() bool
- func (s *Statement) IsSelect() bool
- func (s *Statement) IsUpdate() bool
- func (s *Statement) Limit() int
- func (s *Statement) ModifiesDatabase() bool
- func (s *Statement) OrderBy() []string
- func (s *Statement) Parameters() []string
- func (s *Statement) RewriteQueryToAggregate() (query string, aggregateFunctions map[int]*sql.Call, newColumns map[int]int, ...)
- func (s *Statement) Rollback() bool
- func (s *Statement) Source() string
- func (s *Statement) SourceWithIfExists() string
- func (s *Statement) Type() string
- func (s *Statement) Visit(n sql.Node) (w sql.Visitor, node sql.Node, err error)
- func (s *Statement) VisitEnd(n sql.Node) (sql.Node, error)
- type StaticLeader
- type Subscriber
- type TxSeqTracker
- type TxSeqTrackerProvider
- type WriterPublisher
Constants ¶
View Source
const ( TypeExplain = "EXPLAIN" TypeSelect = "SELECT" TypeInsert = "INSERT" TypeUpdate = "UPDATE" TypeDelete = "DELETE" TypeCreateTable = "CREATE TABLE" TypeCreateIndex = "CREATE INDEX" TypeCreateView = "CREATE VIEW" TypeCreateTrigger = "CREATE TRIGGER" TypeCreateVirtualTable = "CREATE VIRTUAL TABLE" TypeAlterTable = "ALTER TABLE" TypeVacuum = "VACUUM" TypeDrop = "DROP" TypeAnalyze = "ANALYZE" TypeBegin = "BEGIN" TypeCommit = "COMMIT" TypeRollback = "ROLLBACK" TypeSavepoint = "SAVEPOINT" TypeRelease = "RELEASE" TypePragma = "PRAGMA" TypeOther = "OTHER" )
View Source
const DefaultStream = "ha_replication"
View Source
const TXCookieName = "_txseq"
Variables ¶
View Source
var (
ErrInvalidSQL = fmt.Errorf("invalid SQL")
)
View Source
var ErrSnapshotterNotConfigured = errors.New("snapshotter not configured")
Functions ¶
func ConnectHandler ¶ added in v0.8.0
func ConnectHandler(opts ...connect.HandlerOption) (path string, handler http.Handler)
func CrossShardQuery ¶ added in v0.5.8
func LatestSnapshot ¶ added in v0.0.4
func ListReplicationIDs ¶ added in v0.4.5
func ListReplicationIDs() []string
Types ¶
type AsyncNATSPublisher ¶ added in v0.0.15
type AsyncNATSPublisher struct {
*NATSPublisher
// contains filtered or unexported fields
}
func NewAsyncNATSPublisher ¶ added in v0.0.15
func (*AsyncNATSPublisher) Close ¶ added in v0.0.15
func (p *AsyncNATSPublisher) Close() error
func (*AsyncNATSPublisher) Publish ¶ added in v0.0.15
func (p *AsyncNATSPublisher) Publish(cs *ChangeSet) error
func (*AsyncNATSPublisher) Sequence ¶ added in v0.0.20
func (p *AsyncNATSPublisher) Sequence() uint64
type CDCPublisher ¶
type CDCPublisher interface {
Publish(data []DebeziumData) error
}
type Change ¶
type Change struct {
Database string `json:"database,omitempty"`
Table string `json:"table,omitempty"`
Columns []string `json:"columns,omitempty"`
PKColumns []string `json:"pk_columns,omitempty"`
Operation string `json:"operation"` // "INSERT", "UPDATE", "DELETE", "SQL", "CUSTOM"
OldRowID int64 `json:"old_rowid,omitempty"`
NewRowID int64 `json:"new_rowid,omitempty"`
OldValues []any `json:"old_values,omitempty"`
NewValues []any `json:"new_values,omitempty"`
Command string `json:"command,omitempty"`
Args []any `json:"args,omitempty"`
TsNs int64 `json:"ts_ns,omitempty"`
}
func (Change) PKColumnsNames ¶ added in v0.2.0
func (Change) PKNewValues ¶ added in v0.2.1
func (Change) PKOldValues ¶ added in v0.2.0
type ChangeSet ¶
type ChangeSet struct {
Node string `json:"node"`
ProcessID int64 `json:"process_id"`
Filename string `json:"filename"`
Changes []Change `json:"changes"`
Timestamp int64 `json:"timestamp_ns"`
Subject string `json:"-"`
StreamSeq uint64 `json:"-"`
// contains filtered or unexported fields
}
func NewChangeSet ¶
func (*ChangeSet) DebeziumData ¶ added in v0.3.0
func (cs *ChangeSet) DebeziumData() []DebeziumData
func (*ChangeSet) SetApplyStrategy ¶ added in v0.0.21
func (cs *ChangeSet) SetApplyStrategy(fn applyStrategyFn)
func (*ChangeSet) SetConnProvider ¶ added in v0.0.10
func (cs *ChangeSet) SetConnProvider(connProvider ConnHooksProvider)
func (*ChangeSet) SetInterceptor ¶ added in v0.0.6
func (cs *ChangeSet) SetInterceptor(interceptor ChangeSetInterceptor)
type ChangeSetInterceptor ¶ added in v0.0.6
type ChangeSetSerializer ¶ added in v0.0.6
type ConnHooksConfig ¶ added in v0.4.5
type ConnHooksConfig struct {
NodeName string
ReplicationID string
DisableDDLSync bool
Publisher Publisher
CDC CDCPublisher
TxSeqTrackerProvider TxSeqTrackerProvider
Leader LeaderProvider
GrpcTimeout time.Duration
GrpcToken string
QueryRouter *regexp.Regexp
}
type ConnHooksFactory ¶ added in v0.0.10
type ConnHooksFactory func(cfg ConnHooksConfig) ConnHooksProvider
type ConnHooksProvider ¶ added in v0.0.10
type Connector ¶
type Connector struct {
// contains filtered or unexported fields
}
func LookupConnector ¶ added in v0.1.0
func LookupConnectorByReplicationID ¶ added in v0.4.5
func NewConnector ¶
func (*Connector) CDCPublisher ¶ added in v0.3.0
func (c *Connector) CDCPublisher() CDCPublisher
func (*Connector) ConsistentReader ¶ added in v0.1.0
func (*Connector) ConsistentReaderFunc ¶ added in v0.1.0
func (c *Connector) ConsistentReaderFunc(h http.HandlerFunc, timeout time.Duration, methods ...string) http.HandlerFunc
func (*Connector) DeliveredInfo ¶
func (*Connector) ForwardToLeader ¶ added in v0.1.0
func (*Connector) ForwardToLeaderFunc ¶ added in v0.1.0
func (c *Connector) ForwardToLeaderFunc(h http.HandlerFunc, timeout time.Duration, methods ...string) http.HandlerFunc
func (*Connector) LatestSnapshot ¶
func (*Connector) LeaderProvider ¶ added in v0.1.1
func (c *Connector) LeaderProvider() LeaderProvider
func (*Connector) RemoveConsumer ¶
func (*Connector) ResponseWriter ¶ added in v0.1.3
func (c *Connector) ResponseWriter(w http.ResponseWriter) http.ResponseWriter
func (*Connector) Snapshotter ¶ added in v0.0.13
func (c *Connector) Snapshotter() DBSnapshotter
func (*Connector) Subscriber ¶ added in v0.0.13
func (c *Connector) Subscriber() Subscriber
type DBSnapshotter ¶ added in v0.0.6
type DebeziumData ¶ added in v0.3.0
type DebeziumData struct {
Schema any `json:"schema"`
Payload DebeziumPayload `json:"payload"`
Transaction DebeziumTransaction `json:"transaction"`
}
type DebeziumPayload ¶ added in v0.3.0
type DebeziumSource ¶ added in v0.3.0
type DebeziumTransaction ¶ added in v0.3.0
type DebeziumTransaction struct {
ID string `json:"id"`
}
type DynamicLeader ¶ added in v0.1.1
type DynamicLeader struct {
// contains filtered or unexported fields
}
func (*DynamicLeader) IsLeader ¶ added in v0.1.1
func (d *DynamicLeader) IsLeader() bool
func (*DynamicLeader) Ready ¶ added in v0.1.1
func (d *DynamicLeader) Ready() chan struct{}
func (*DynamicLeader) RedirectTarget ¶ added in v0.1.1
func (d *DynamicLeader) RedirectTarget() string
type EmbeddedNatsConfig ¶
type JSONPublisher ¶ added in v0.0.6
type JSONPublisher struct {
// contains filtered or unexported fields
}
func NewJSONPublisher ¶ added in v0.0.6
func NewJSONPublisher(w io.Writer) *JSONPublisher
func (*JSONPublisher) Publish ¶ added in v0.0.6
func (p *JSONPublisher) Publish(cs *ChangeSet) error
func (*JSONPublisher) Sequence ¶ added in v0.0.20
func (p *JSONPublisher) Sequence() uint64
type LeaderProvider ¶ added in v0.1.0
type NATSPublisher ¶ added in v0.0.6
type NATSPublisher struct {
// contains filtered or unexported fields
}
func NewNATSPublisher ¶ added in v0.0.6
func NewNATSPublisher(nc *nats.Conn, subject string, timeout time.Duration, streamConfig *jetstream.StreamConfig) (*NATSPublisher, error)
func (*NATSPublisher) Publish ¶ added in v0.0.6
func (p *NATSPublisher) Publish(cs *ChangeSet) error
func (*NATSPublisher) Sequence ¶ added in v0.0.20
func (p *NATSPublisher) Sequence() uint64
type NATSSnapshotter ¶ added in v0.0.6
type NATSSnapshotter struct {
// contains filtered or unexported fields
}
func NewNATSSnapshotter ¶ added in v0.0.6
func (*NATSSnapshotter) LatestSnapshot ¶ added in v0.0.6
func (s *NATSSnapshotter) LatestSnapshot(ctx context.Context) (uint64, io.ReadCloser, error)
func (*NATSSnapshotter) LatestSnapshotSequence ¶ added in v0.0.6
func (s *NATSSnapshotter) LatestSnapshotSequence(ctx context.Context) (uint64, error)
func (*NATSSnapshotter) SetDB ¶ added in v0.0.13
func (s *NATSSnapshotter) SetDB(db *sql.DB)
func (*NATSSnapshotter) Start ¶ added in v0.0.13
func (s *NATSSnapshotter) Start()
func (*NATSSnapshotter) TakeSnapshot ¶ added in v0.0.6
func (s *NATSSnapshotter) TakeSnapshot(ctx context.Context) (sequence uint64, err error)
type NATSSubscriber ¶ added in v0.0.6
type NATSSubscriber struct {
// contains filtered or unexported fields
}
func NewNATSSubscriber ¶ added in v0.0.6
func NewNATSSubscriber(cfg NATSSubscriberConfig) (*NATSSubscriber, error)
func (*NATSSubscriber) DeliveredInfo ¶ added in v0.0.6
func (*NATSSubscriber) LatestSeq ¶ added in v0.0.6
func (s *NATSSubscriber) LatestSeq() uint64
func (*NATSSubscriber) RemoveConsumer ¶ added in v0.0.6
func (s *NATSSubscriber) RemoveConsumer(ctx context.Context, name string) error
func (*NATSSubscriber) SetDB ¶ added in v0.0.13
func (s *NATSSubscriber) SetDB(db *sql.DB)
func (*NATSSubscriber) Start ¶ added in v0.0.6
func (s *NATSSubscriber) Start() error
type NATSSubscriberConfig ¶ added in v0.0.21
type NATSSubscriberConfig struct {
Node string
Durable string
NatsConn *nats.Conn
Stream string
Subject string
Policy string
DB *sql.DB
ConnProvider ConnHooksProvider
Interceptor ChangeSetInterceptor
RowIdentify RowIdentify
}
type NoopPublisher ¶ added in v0.0.6
type NoopPublisher struct{}
func NewNoopPublisher ¶ added in v0.0.6
func NewNoopPublisher() *NoopPublisher
func (*NoopPublisher) Publish ¶ added in v0.0.6
func (p *NoopPublisher) Publish(cs *ChangeSet) error
func (*NoopPublisher) Sequence ¶ added in v0.0.20
func (p *NoopPublisher) Sequence() uint64
type NoopSnapshotter ¶ added in v0.0.6
type NoopSnapshotter struct {
// contains filtered or unexported fields
}
func NewNoopSnapshotter ¶ added in v0.0.6
func NewNoopSnapshotter() *NoopSnapshotter
func (*NoopSnapshotter) LatestSnapshot ¶ added in v0.0.6
func (s *NoopSnapshotter) LatestSnapshot(ctx context.Context) (uint64, io.ReadCloser, error)
func (*NoopSnapshotter) SetDB ¶ added in v0.0.13
func (s *NoopSnapshotter) SetDB(_ *sql.DB)
func (*NoopSnapshotter) Start ¶ added in v0.0.13
func (s *NoopSnapshotter) Start()
func (*NoopSnapshotter) TakeSnapshot ¶ added in v0.0.6
func (s *NoopSnapshotter) TakeSnapshot(ctx context.Context) (sequence uint64, err error)
type NoopSubscriber ¶ added in v0.0.6
type NoopSubscriber struct{}
func NewNoopSubscriber ¶ added in v0.0.6
func NewNoopSubscriber() *NoopSubscriber
func (*NoopSubscriber) DeliveredInfo ¶ added in v0.0.6
func (*NoopSubscriber) LatestSeq ¶ added in v0.0.6
func (*NoopSubscriber) LatestSeq() uint64
func (*NoopSubscriber) RemoveConsumer ¶ added in v0.0.6
func (*NoopSubscriber) RemoveConsumer(ctx context.Context, name string) error
func (*NoopSubscriber) SetDB ¶ added in v0.0.13
func (*NoopSubscriber) SetDB(db *sql.DB)
func (*NoopSubscriber) Start ¶ added in v0.0.6
func (*NoopSubscriber) Start() error
type Option ¶
type Option func(*Connector)
func WithAsyncPublisher ¶ added in v0.0.15
func WithAsyncPublisher() Option
func WithAsyncPublisherOutboxDir ¶ added in v0.0.15
func WithAutoStart ¶ added in v0.1.0
func WithCDCPublisher ¶
func WithCDCPublisher(p CDCPublisher) Option
func WithChangeSetInterceptor ¶ added in v0.0.6
func WithChangeSetInterceptor(interceptor ChangeSetInterceptor) Option
func WithClusterSize ¶ added in v0.1.1
func WithDBSnapshotter ¶ added in v0.0.6
func WithDBSnapshotter(snap DBSnapshotter) Option
func WithDeliverPolicy ¶
func WithDisableDDLSync ¶ added in v0.0.4
func WithDisableDDLSync() Option
func WithEmbeddedNatsConfig ¶
func WithEmbeddedNatsConfig(cfg *EmbeddedNatsConfig) Option
func WithExtensions ¶
func WithGrpcPort ¶ added in v0.4.5
func WithGrpcTimeout ¶ added in v0.4.7
func WithGrpcToken ¶ added in v0.7.0
func WithLeaderElectionLocalTarget ¶ added in v0.1.1
func WithLeaderProvider ¶ added in v0.1.1
func WithLeaderProvider(p LeaderProvider) Option
func WithNatsOptions ¶
func WithPublisherTimeout ¶
func WithQueryRouter ¶ added in v0.5.5
func WithReplicas ¶
func WithReplicationID ¶ added in v0.3.0
func WithReplicationPublisher ¶ added in v0.3.0
func WithReplicationStream ¶ added in v0.0.9
func WithReplicationSubscriber ¶ added in v0.3.0
func WithReplicationSubscriber(sub Subscriber) Option
func WithReplicationURL ¶
func WithRowIdentify ¶ added in v0.0.21
func WithRowIdentify(i RowIdentify) Option
func WithSnapshotInterval ¶
func WithStreamMaxAge ¶
func WithWaitFor ¶ added in v0.0.4
func WithWaitFor(ch chan struct{}) Option
type RowIdentify ¶ added in v0.0.21
type RowIdentify string
const ( PK RowIdentify = "pk" Rowid RowIdentify = "rowid" Full RowIdentify = "full" )
type SequenceProvider ¶
type SequenceProvider interface {
LatestSeq() uint64
}
type Statement ¶
type Statement struct {
// contains filtered or unexported fields
}
func ParseStatement ¶ added in v0.0.4
func UnverifiedStatement ¶ added in v0.0.12
func (*Statement) AggregateFunctions ¶ added in v0.6.5
func (*Statement) HasDistinct ¶
func (*Statement) HasReturning ¶
func (*Statement) IsCreateTable ¶
func (*Statement) ModifiesDatabase ¶ added in v0.0.12
func (*Statement) Parameters ¶
func (*Statement) RewriteQueryToAggregate ¶ added in v0.6.5
func (*Statement) SourceWithIfExists ¶ added in v0.0.10
type StaticLeader ¶ added in v0.1.0
type StaticLeader struct {
Target string
}
func (*StaticLeader) IsLeader ¶ added in v0.1.0
func (s *StaticLeader) IsLeader() bool
func (*StaticLeader) Ready ¶ added in v0.1.1
func (s *StaticLeader) Ready() chan struct{}
func (*StaticLeader) RedirectTarget ¶ added in v0.1.0
func (s *StaticLeader) RedirectTarget() string
type Subscriber ¶ added in v0.3.0
type TxSeqTracker ¶ added in v0.4.8
type TxSeqTracker interface {
LatestSeq() uint64
}
type TxSeqTrackerProvider ¶ added in v0.4.8
type TxSeqTrackerProvider func() TxSeqTracker
type WriterPublisher ¶ added in v0.0.6
type WriterPublisher struct {
// contains filtered or unexported fields
}
func NewWriterPublisher ¶ added in v0.0.6
func NewWriterPublisher(w io.Writer, serializer ChangeSetSerializer) *WriterPublisher
func (*WriterPublisher) Publish ¶ added in v0.0.6
func (p *WriterPublisher) Publish(cs *ChangeSet) error
func (*WriterPublisher) Sequence ¶ added in v0.0.20
func (p *WriterPublisher) Sequence() uint64
Source Files
¶
Click to show internal directories.
Click to hide internal directories.