ha

package module
v0.8.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 3, 2026 License: Apache-2.0 Imports: 33 Imported by: 6

README

go-ha

Go database/sql base driver providing high availability for SQLite databases.

Features

  • High availability support for SQLite databases.
  • Replication: Synchronize data across nodes using NATS.
  • Customize the replication strategy
  • Leaderless clusters: Read/Write from/to any node. Last-writer wins by default, but you can customize conflict resolutions by implementing ChangeSetInterceptor.
  • Leader-based cluster: Write operations are redirected to the leader to prevent conflicts entirely.
  • Embedded or External NATS: Choose between an embedded NATS server or an external one for replication.
  • Easy to integrate with existing Go projects.

Drivers

Options

DSN Param Description Default
asyncPublisher Enables asynchronous publishing of replication events. false
asyncPublisherOutboxDir Directory to store outbox files for asynchronous publishing.
autoStart Automatically starts the subscriber and snapshotter when the node is initialized. true
replicationID Replication ID [database filename]
deliverPolicy Specifies the delivery policy for replication events. Options include all, last, etc. all
disableSubscriber Disables the subscriber for replication. false
disablePublisher Disables the publisher for replication. false
disableDBSnapshotter Disables the database snapshotter used for initial synchronization. false
disableDDLSync Disables the synchronization of DDL (Data Definition Language) changes across nodes. false
grpcPort TCP port for the gRPC server
grpcTimeout Timeout for the gRPC operations 5s
grpcToken Token to protect gRPC server
leaderProvider Defines the strategy for determining a leader node in the cluster. This is useful for redirecting HTTP requests. Examples include dynamic:http://host:port or static:http://host:port.
name Name of the node in the cluster.
natsConfigFile Path to the configuration file for the embedded NATS server. Overrides others NATS configurations.
natsName Sets the name of the embedded NATS server.
natsPort Configures the port for the embedded NATS server. 4222
natsStoreDir Directory to store data for the embedded NATS server.
publisherTimeout Timeout duration for publishing replication events. 15s
replicationStream Name of the replication stream used for synchronizing data.
replicationURL URL used for connecting to the replication stream.
replicas Number of replicas to maintain for high availability. 1
rowIdentify Row identify strategy: pk, rowid or full pk
snapshotInterval Interval for taking database snapshots. 1m
streamMaxAge Maximum age of messages in the replication stream before they are removed.

Projects using go-ha

  • HA: Highly available leaderless SQLite cluster with HTTP and PostgreSQL Wire Protocol
  • PocketBase HA: Highly available leaderless PocketBase cluster
  • sqlc-http: Generate net/http go server from SQL

Contributing

Contributions are welcome! Please open an issue or submit a pull request.

License

This project is licensed under the MIT License. See the LICENSE file for details.

Documentation

Index

Constants

View Source
const (
	TypeExplain            = "EXPLAIN"
	TypeSelect             = "SELECT"
	TypeInsert             = "INSERT"
	TypeUpdate             = "UPDATE"
	TypeDelete             = "DELETE"
	TypeCreateTable        = "CREATE TABLE"
	TypeCreateIndex        = "CREATE INDEX"
	TypeCreateView         = "CREATE VIEW"
	TypeCreateTrigger      = "CREATE TRIGGER"
	TypeCreateVirtualTable = "CREATE VIRTUAL TABLE"
	TypeAlterTable         = "ALTER TABLE"
	TypeVacuum             = "VACUUM"
	TypeDrop               = "DROP"
	TypeAnalyze            = "ANALYZE"
	TypeBegin              = "BEGIN"
	TypeCommit             = "COMMIT"
	TypeRollback           = "ROLLBACK"
	TypeSavepoint          = "SAVEPOINT"
	TypeRelease            = "RELEASE"
	TypePragma             = "PRAGMA"
	TypeOther              = "OTHER"
)
View Source
const DefaultStream = "ha_replication"
View Source
const TXCookieName = "_txseq"

Variables

View Source
var (
	ErrInvalidSQL = fmt.Errorf("invalid SQL")
)
View Source
var ErrSnapshotterNotConfigured = errors.New("snapshotter not configured")

Functions

func ConnectHandler added in v0.8.0

func ConnectHandler(opts ...connect.HandlerOption) (path string, handler http.Handler)

func CrossShardQuery added in v0.5.8

func CrossShardQuery(ctx context.Context, stmt *Statement, args []driver.NamedValue, queryRouter *regexp.Regexp, driverConn driverConnFunc) (driver.Rows, error)

func LatestSnapshot added in v0.0.4

func LatestSnapshot(ctx context.Context, dsn string, options ...Option) (sequence uint64, reader io.ReadCloser, err error)

func ListDSN added in v0.1.3

func ListDSN() []string

func ListReplicationIDs added in v0.4.5

func ListReplicationIDs() []string

func Shutdown added in v0.0.16

func Shutdown()

Types

type AsyncNATSPublisher added in v0.0.15

type AsyncNATSPublisher struct {
	*NATSPublisher
	// contains filtered or unexported fields
}

func NewAsyncNATSPublisher added in v0.0.15

func NewAsyncNATSPublisher(nc *nats.Conn, subject string, timeout time.Duration, streamConfig *jetstream.StreamConfig, db *sql.DB) (*AsyncNATSPublisher, error)

func (*AsyncNATSPublisher) Close added in v0.0.15

func (p *AsyncNATSPublisher) Close() error

func (*AsyncNATSPublisher) Publish added in v0.0.15

func (p *AsyncNATSPublisher) Publish(cs *ChangeSet) error

func (*AsyncNATSPublisher) Sequence added in v0.0.20

func (p *AsyncNATSPublisher) Sequence() uint64

type BackupFn added in v0.0.10

type BackupFn func(context.Context, *sql.DB, io.Writer) error

type CDCPublisher

type CDCPublisher interface {
	Publish(data []DebeziumData) error
}

type Change

type Change struct {
	Database  string   `json:"database,omitempty"`
	Table     string   `json:"table,omitempty"`
	Columns   []string `json:"columns,omitempty"`
	PKColumns []string `json:"pk_columns,omitempty"`
	Operation string   `json:"operation"` // "INSERT", "UPDATE", "DELETE", "SQL", "CUSTOM"
	OldRowID  int64    `json:"old_rowid,omitempty"`
	NewRowID  int64    `json:"new_rowid,omitempty"`
	OldValues []any    `json:"old_values,omitempty"`
	NewValues []any    `json:"new_values,omitempty"`
	Command   string   `json:"command,omitempty"`
	Args      []any    `json:"args,omitempty"`
	TsNs      int64    `json:"ts_ns,omitempty"`
}

func (Change) PKColumnsNames added in v0.2.0

func (c Change) PKColumnsNames() []string

func (Change) PKNewValues added in v0.2.1

func (c Change) PKNewValues() []any

func (Change) PKOldValues added in v0.2.0

func (c Change) PKOldValues() []any

type ChangeSet

type ChangeSet struct {
	Node      string   `json:"node"`
	ProcessID int64    `json:"process_id"`
	Filename  string   `json:"filename"`
	Changes   []Change `json:"changes"`
	Timestamp int64    `json:"timestamp_ns"`
	Subject   string   `json:"-"`
	StreamSeq uint64   `json:"-"`
	// contains filtered or unexported fields
}

func NewChangeSet

func NewChangeSet(node string, replicationID string) *ChangeSet

func (*ChangeSet) AddChange

func (cs *ChangeSet) AddChange(change Change)

func (*ChangeSet) Apply

func (cs *ChangeSet) Apply(db *sql.DB) (err error)

func (*ChangeSet) Clear

func (cs *ChangeSet) Clear()

func (*ChangeSet) DebeziumData added in v0.3.0

func (cs *ChangeSet) DebeziumData() []DebeziumData

func (*ChangeSet) Send

func (cs *ChangeSet) Send(pub Publisher) error

func (*ChangeSet) SetApplyStrategy added in v0.0.21

func (cs *ChangeSet) SetApplyStrategy(fn applyStrategyFn)

func (*ChangeSet) SetConnProvider added in v0.0.10

func (cs *ChangeSet) SetConnProvider(connProvider ConnHooksProvider)

func (*ChangeSet) SetInterceptor added in v0.0.6

func (cs *ChangeSet) SetInterceptor(interceptor ChangeSetInterceptor)

type ChangeSetInterceptor added in v0.0.6

type ChangeSetInterceptor interface {
	BeforeApply(*ChangeSet, *sql.Conn) (skip bool, err error)
	AfterApply(*ChangeSet, *sql.Conn, error) error
}

type ChangeSetSerializer added in v0.0.6

type ChangeSetSerializer func(*ChangeSet) ([]byte, error)

type ConnHooksConfig added in v0.4.5

type ConnHooksConfig struct {
	NodeName             string
	ReplicationID        string
	DisableDDLSync       bool
	Publisher            Publisher
	CDC                  CDCPublisher
	TxSeqTrackerProvider TxSeqTrackerProvider
	Leader               LeaderProvider
	GrpcTimeout          time.Duration
	GrpcToken            string
	QueryRouter          *regexp.Regexp
}

type ConnHooksFactory added in v0.0.10

type ConnHooksFactory func(cfg ConnHooksConfig) ConnHooksProvider

type ConnHooksProvider added in v0.0.10

type ConnHooksProvider interface {
	RegisterHooks(driver.Conn) (driver.Conn, error)
	DisableHooks(*sql.Conn) error
	EnableHooks(*sql.Conn) error
}

type Connector

type Connector struct {
	// contains filtered or unexported fields
}

func LookupConnector added in v0.1.0

func LookupConnector(dsn string) (*Connector, bool)

func LookupConnectorByReplicationID added in v0.4.5

func LookupConnectorByReplicationID(id string) (*Connector, bool)

func NewConnector

func NewConnector(dsn string, drv driver.Driver, connHooksFactory ConnHooksFactory, backupFn BackupFn, options ...Option) (*Connector, error)

func (*Connector) Backup added in v0.8.0

func (c *Connector) Backup(ctx context.Context, writer io.Writer) error

func (*Connector) CDCPublisher added in v0.3.0

func (c *Connector) CDCPublisher() CDCPublisher

func (*Connector) Close

func (c *Connector) Close()

func (*Connector) Connect

func (c *Connector) Connect(ctx context.Context) (driver.Conn, error)

func (*Connector) ConsistentReader added in v0.1.0

func (c *Connector) ConsistentReader(timeout time.Duration, methods ...string) func(http.Handler) http.Handler

func (*Connector) ConsistentReaderFunc added in v0.1.0

func (c *Connector) ConsistentReaderFunc(h http.HandlerFunc, timeout time.Duration, methods ...string) http.HandlerFunc

func (*Connector) DB added in v0.4.5

func (c *Connector) DB() *sql.DB

func (*Connector) DeliveredInfo

func (c *Connector) DeliveredInfo(ctx context.Context, name string) (any, error)

func (*Connector) Driver

func (c *Connector) Driver() driver.Driver

func (*Connector) ForwardToLeader added in v0.1.0

func (c *Connector) ForwardToLeader(timeout time.Duration, methods ...string) func(http.Handler) http.Handler

func (*Connector) ForwardToLeaderFunc added in v0.1.0

func (c *Connector) ForwardToLeaderFunc(h http.HandlerFunc, timeout time.Duration, methods ...string) http.HandlerFunc

func (*Connector) LatestSeq

func (c *Connector) LatestSeq() uint64

func (*Connector) LatestSnapshot

func (c *Connector) LatestSnapshot(ctx context.Context) (uint64, io.ReadCloser, error)

func (*Connector) LeaderProvider added in v0.1.1

func (c *Connector) LeaderProvider() LeaderProvider

func (*Connector) NodeName added in v0.0.10

func (c *Connector) NodeName() string

func (*Connector) PubSeq added in v0.4.5

func (c *Connector) PubSeq() uint64

func (*Connector) Publisher added in v0.0.10

func (c *Connector) Publisher() Publisher

func (*Connector) RemoveConsumer

func (c *Connector) RemoveConsumer(ctx context.Context, name string) error

func (*Connector) ResponseWriter added in v0.1.3

func (c *Connector) ResponseWriter(w http.ResponseWriter) http.ResponseWriter

func (*Connector) Snapshotter added in v0.0.13

func (c *Connector) Snapshotter() DBSnapshotter

func (*Connector) Start added in v0.1.1

func (c *Connector) Start(db *sql.DB) error

func (*Connector) Subscriber added in v0.0.13

func (c *Connector) Subscriber() Subscriber

func (*Connector) TakeSnapshot

func (c *Connector) TakeSnapshot(ctx context.Context) (sequence uint64, err error)

type DBSnapshotter added in v0.0.6

type DBSnapshotter interface {
	SetDB(*sql.DB)
	Start()
	TakeSnapshot(ctx context.Context) (sequence uint64, err error)
	LatestSnapshot(ctx context.Context) (sequence uint64, reader io.ReadCloser, err error)
}

type DebeziumData added in v0.3.0

type DebeziumData struct {
	Schema      any                 `json:"schema"`
	Payload     DebeziumPayload     `json:"payload"`
	Transaction DebeziumTransaction `json:"transaction"`
}

type DebeziumPayload added in v0.3.0

type DebeziumPayload struct {
	Before map[string]any `json:"before"`
	After  map[string]any `json:"after"`
	Source DebeziumSource `json:"source"`
	Op     string         `json:"op"`
	TsNs   int64          `json:"ts_ns"`
}

type DebeziumSource added in v0.3.0

type DebeziumSource struct {
	Version   string `json:"version"`
	Connector string `json:"connector"`
	Name      string `json:"name"`
	ServerID  int64  `json:"server_id"`
	TsNs      int64  `json:"ts_ns"`
	DB        string `json:"db"`
	Table     string `json:"table"`
}

type DebeziumTransaction added in v0.3.0

type DebeziumTransaction struct {
	ID string `json:"id"`
}

type DynamicLeader added in v0.1.1

type DynamicLeader struct {
	// contains filtered or unexported fields
}

func (*DynamicLeader) IsLeader added in v0.1.1

func (d *DynamicLeader) IsLeader() bool

func (*DynamicLeader) Ready added in v0.1.1

func (d *DynamicLeader) Ready() chan struct{}

func (*DynamicLeader) RedirectTarget added in v0.1.1

func (d *DynamicLeader) RedirectTarget() string

type EmbeddedNatsConfig

type EmbeddedNatsConfig struct {
	Name       string
	Port       int
	StoreDir   string
	User       string
	Pass       string
	File       string
	EnableLogs bool
}

type JSONPublisher added in v0.0.6

type JSONPublisher struct {
	// contains filtered or unexported fields
}

func NewJSONPublisher added in v0.0.6

func NewJSONPublisher(w io.Writer) *JSONPublisher

func (*JSONPublisher) Publish added in v0.0.6

func (p *JSONPublisher) Publish(cs *ChangeSet) error

func (*JSONPublisher) Sequence added in v0.0.20

func (p *JSONPublisher) Sequence() uint64

type LeaderProvider added in v0.1.0

type LeaderProvider interface {
	IsLeader() bool
	Ready() chan struct{}
	RedirectTarget() string
}

type NATSPublisher added in v0.0.6

type NATSPublisher struct {
	// contains filtered or unexported fields
}

func NewNATSPublisher added in v0.0.6

func NewNATSPublisher(nc *nats.Conn, subject string, timeout time.Duration, streamConfig *jetstream.StreamConfig) (*NATSPublisher, error)

func (*NATSPublisher) Publish added in v0.0.6

func (p *NATSPublisher) Publish(cs *ChangeSet) error

func (*NATSPublisher) Sequence added in v0.0.20

func (p *NATSPublisher) Sequence() uint64

type NATSSnapshotter added in v0.0.6

type NATSSnapshotter struct {
	// contains filtered or unexported fields
}

func NewNATSSnapshotter added in v0.0.6

func NewNATSSnapshotter(ctx context.Context, nc *nats.Conn, replicas int, stream string, db *sql.DB, backupFn BackupFn, interval time.Duration, sequenceProvider SequenceProvider, objectName string) (*NATSSnapshotter, error)

func (*NATSSnapshotter) LatestSnapshot added in v0.0.6

func (s *NATSSnapshotter) LatestSnapshot(ctx context.Context) (uint64, io.ReadCloser, error)

func (*NATSSnapshotter) LatestSnapshotSequence added in v0.0.6

func (s *NATSSnapshotter) LatestSnapshotSequence(ctx context.Context) (uint64, error)

func (*NATSSnapshotter) SetDB added in v0.0.13

func (s *NATSSnapshotter) SetDB(db *sql.DB)

func (*NATSSnapshotter) Start added in v0.0.13

func (s *NATSSnapshotter) Start()

func (*NATSSnapshotter) TakeSnapshot added in v0.0.6

func (s *NATSSnapshotter) TakeSnapshot(ctx context.Context) (sequence uint64, err error)

type NATSSubscriber added in v0.0.6

type NATSSubscriber struct {
	// contains filtered or unexported fields
}

func NewNATSSubscriber added in v0.0.6

func NewNATSSubscriber(cfg NATSSubscriberConfig) (*NATSSubscriber, error)

func (*NATSSubscriber) DeliveredInfo added in v0.0.6

func (s *NATSSubscriber) DeliveredInfo(ctx context.Context, name string) (any, error)

func (*NATSSubscriber) LatestSeq added in v0.0.6

func (s *NATSSubscriber) LatestSeq() uint64

func (*NATSSubscriber) RemoveConsumer added in v0.0.6

func (s *NATSSubscriber) RemoveConsumer(ctx context.Context, name string) error

func (*NATSSubscriber) SetDB added in v0.0.13

func (s *NATSSubscriber) SetDB(db *sql.DB)

func (*NATSSubscriber) Start added in v0.0.6

func (s *NATSSubscriber) Start() error

type NATSSubscriberConfig added in v0.0.21

type NATSSubscriberConfig struct {
	Node         string
	Durable      string
	NatsConn     *nats.Conn
	Stream       string
	Subject      string
	Policy       string
	DB           *sql.DB
	ConnProvider ConnHooksProvider
	Interceptor  ChangeSetInterceptor
	RowIdentify  RowIdentify
}

type NoopPublisher added in v0.0.6

type NoopPublisher struct{}

func NewNoopPublisher added in v0.0.6

func NewNoopPublisher() *NoopPublisher

func (*NoopPublisher) Publish added in v0.0.6

func (p *NoopPublisher) Publish(cs *ChangeSet) error

func (*NoopPublisher) Sequence added in v0.0.20

func (p *NoopPublisher) Sequence() uint64

type NoopSnapshotter added in v0.0.6

type NoopSnapshotter struct {
	// contains filtered or unexported fields
}

func NewNoopSnapshotter added in v0.0.6

func NewNoopSnapshotter() *NoopSnapshotter

func (*NoopSnapshotter) LatestSnapshot added in v0.0.6

func (s *NoopSnapshotter) LatestSnapshot(ctx context.Context) (uint64, io.ReadCloser, error)

func (*NoopSnapshotter) SetDB added in v0.0.13

func (s *NoopSnapshotter) SetDB(_ *sql.DB)

func (*NoopSnapshotter) Start added in v0.0.13

func (s *NoopSnapshotter) Start()

func (*NoopSnapshotter) TakeSnapshot added in v0.0.6

func (s *NoopSnapshotter) TakeSnapshot(ctx context.Context) (sequence uint64, err error)

type NoopSubscriber added in v0.0.6

type NoopSubscriber struct{}

func NewNoopSubscriber added in v0.0.6

func NewNoopSubscriber() *NoopSubscriber

func (*NoopSubscriber) DeliveredInfo added in v0.0.6

func (*NoopSubscriber) DeliveredInfo(ctx context.Context, name string) (any, error)

func (*NoopSubscriber) LatestSeq added in v0.0.6

func (*NoopSubscriber) LatestSeq() uint64

func (*NoopSubscriber) RemoveConsumer added in v0.0.6

func (*NoopSubscriber) RemoveConsumer(ctx context.Context, name string) error

func (*NoopSubscriber) SetDB added in v0.0.13

func (*NoopSubscriber) SetDB(db *sql.DB)

func (*NoopSubscriber) Start added in v0.0.6

func (*NoopSubscriber) Start() error

type Option

type Option func(*Connector)

func NameToOptions added in v0.0.10

func NameToOptions(name string) (string, []Option, error)

func WithAsyncPublisher added in v0.0.15

func WithAsyncPublisher() Option

func WithAsyncPublisherOutboxDir added in v0.0.15

func WithAsyncPublisherOutboxDir(dir string) Option

func WithAutoStart added in v0.1.0

func WithAutoStart(enabled bool) Option

func WithCDCPublisher

func WithCDCPublisher(p CDCPublisher) Option

func WithChangeSetInterceptor added in v0.0.6

func WithChangeSetInterceptor(interceptor ChangeSetInterceptor) Option

func WithClusterSize added in v0.1.1

func WithClusterSize(size int) Option

func WithDBSnapshotter added in v0.0.6

func WithDBSnapshotter(snap DBSnapshotter) Option

func WithDeliverPolicy

func WithDeliverPolicy(deliverPolicy string) Option

func WithDisableDDLSync added in v0.0.4

func WithDisableDDLSync() Option

func WithEmbeddedNatsConfig

func WithEmbeddedNatsConfig(cfg *EmbeddedNatsConfig) Option

func WithExtensions

func WithExtensions(extensions ...string) Option

func WithGrpcPort added in v0.4.5

func WithGrpcPort(port int) Option

func WithGrpcTimeout added in v0.4.7

func WithGrpcTimeout(timeout time.Duration) Option

func WithGrpcToken added in v0.7.0

func WithGrpcToken(token string) Option

func WithLeaderElectionLocalTarget added in v0.1.1

func WithLeaderElectionLocalTarget(localEndpoint string) Option

func WithLeaderProvider added in v0.1.1

func WithLeaderProvider(p LeaderProvider) Option

func WithName

func WithName(name string) Option

func WithNatsOptions

func WithNatsOptions(options ...nats.Option) Option

func WithPublisherTimeout

func WithPublisherTimeout(timeout time.Duration) Option

func WithQueryRouter added in v0.5.5

func WithQueryRouter(re *regexp.Regexp) Option

func WithReplicas

func WithReplicas(replicas int) Option

func WithReplicationID added in v0.3.0

func WithReplicationID(id string) Option

func WithReplicationPublisher added in v0.3.0

func WithReplicationPublisher(pub Publisher) Option

func WithReplicationStream added in v0.0.9

func WithReplicationStream(stream string) Option

func WithReplicationSubscriber added in v0.3.0

func WithReplicationSubscriber(sub Subscriber) Option

func WithReplicationURL

func WithReplicationURL(url string) Option

func WithRowIdentify added in v0.0.21

func WithRowIdentify(i RowIdentify) Option

func WithSnapshotInterval

func WithSnapshotInterval(interval time.Duration) Option

func WithStreamMaxAge

func WithStreamMaxAge(maxAge time.Duration) Option

func WithWaitFor added in v0.0.4

func WithWaitFor(ch chan struct{}) Option

type Publisher added in v0.3.0

type Publisher interface {
	Publish(cs *ChangeSet) error
	Sequence() uint64
}

type RowIdentify added in v0.0.21

type RowIdentify string
const (
	PK    RowIdentify = "pk"
	Rowid RowIdentify = "rowid"
	Full  RowIdentify = "full"
)

type SequenceProvider

type SequenceProvider interface {
	LatestSeq() uint64
}

type Statement

type Statement struct {
	// contains filtered or unexported fields
}

func Parse

func Parse(ctx context.Context, sql string) ([]*Statement, error)

func ParseStatement added in v0.0.4

func ParseStatement(ctx context.Context, source string) (*Statement, error)

func UnverifiedStatement added in v0.0.12

func UnverifiedStatement(source string, hasDistinct bool,
	hasReturning bool, typ string, parameters []string, columns []string,
	ddl bool, hasIfExists bool, hasModifier bool,
	modifiesDatabase bool, aggregateFunctions map[int]*sql.Call) *Statement

func (*Statement) AggregateFunctions added in v0.6.5

func (s *Statement) AggregateFunctions() map[int]*sql.Call

func (*Statement) Begin

func (s *Statement) Begin() bool

func (*Statement) Columns

func (s *Statement) Columns() []string

func (*Statement) Commit

func (s *Statement) Commit() bool

func (*Statement) DDL

func (s *Statement) DDL() bool

func (*Statement) HasDistinct

func (s *Statement) HasDistinct() bool

func (*Statement) HasReturning

func (s *Statement) HasReturning() bool

func (*Statement) IsCreateTable

func (s *Statement) IsCreateTable() bool

func (*Statement) IsDelete

func (s *Statement) IsDelete() bool

func (*Statement) IsExplain

func (s *Statement) IsExplain() bool

func (*Statement) IsInsert

func (s *Statement) IsInsert() bool

func (*Statement) IsSelect

func (s *Statement) IsSelect() bool

func (*Statement) IsUpdate

func (s *Statement) IsUpdate() bool

func (*Statement) Limit added in v0.5.9

func (s *Statement) Limit() int

func (*Statement) ModifiesDatabase added in v0.0.12

func (s *Statement) ModifiesDatabase() bool

func (*Statement) OrderBy added in v0.5.6

func (s *Statement) OrderBy() []string

func (*Statement) Parameters

func (s *Statement) Parameters() []string

func (*Statement) RewriteQueryToAggregate added in v0.6.5

func (s *Statement) RewriteQueryToAggregate() (query string, aggregateFunctions map[int]*sql.Call, newColumns map[int]int, err error)

func (*Statement) Rollback

func (s *Statement) Rollback() bool

func (*Statement) Source

func (s *Statement) Source() string

func (*Statement) SourceWithIfExists added in v0.0.10

func (s *Statement) SourceWithIfExists() string

func (*Statement) Type

func (s *Statement) Type() string

func (*Statement) Visit added in v0.6.0

func (s *Statement) Visit(n sql.Node) (w sql.Visitor, node sql.Node, err error)

func (*Statement) VisitEnd added in v0.6.0

func (s *Statement) VisitEnd(n sql.Node) (sql.Node, error)

type StaticLeader added in v0.1.0

type StaticLeader struct {
	Target string
}

func (*StaticLeader) IsLeader added in v0.1.0

func (s *StaticLeader) IsLeader() bool

func (*StaticLeader) Ready added in v0.1.1

func (s *StaticLeader) Ready() chan struct{}

func (*StaticLeader) RedirectTarget added in v0.1.0

func (s *StaticLeader) RedirectTarget() string

type Subscriber added in v0.3.0

type Subscriber interface {
	TxSeqTracker
	SetDB(*sql.DB)
	Start() error
	RemoveConsumer(ctx context.Context, name string) error
	DeliveredInfo(ctx context.Context, name string) (any, error)
}

type TxSeqTracker added in v0.4.8

type TxSeqTracker interface {
	LatestSeq() uint64
}

type TxSeqTrackerProvider added in v0.4.8

type TxSeqTrackerProvider func() TxSeqTracker

type WriterPublisher added in v0.0.6

type WriterPublisher struct {
	// contains filtered or unexported fields
}

func NewWriterPublisher added in v0.0.6

func NewWriterPublisher(w io.Writer, serializer ChangeSetSerializer) *WriterPublisher

func (*WriterPublisher) Publish added in v0.0.6

func (p *WriterPublisher) Publish(cs *ChangeSet) error

func (*WriterPublisher) Sequence added in v0.0.20

func (p *WriterPublisher) Sequence() uint64

Directories

Path Synopsis
api

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL