ha

package module
v0.0.21 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 10, 2025 License: Apache-2.0 Imports: 25 Imported by: 6

README

go-ha

Go database/sql base driver providing high availability for SQLite databases.

Features

  • High availability support for SQLite databases.
  • Replication: Synchronize data across nodes using NATS.
  • Customize the replication strategy
  • Leaderless clusters: Read/Write from/to any node. Last-writer wins by default, but you can customize conflict resolutions by implementing ChangeSetInterceptor.
  • Embedded or External NATS: Choose between an embedded NATS server or an external one for replication.
  • Easy to integrate with existing Go projects.

Drivers

Options

Option Description Default
asyncPublisher Enables asynchronous publishing of replication events. false
asyncPublisherOutboxDir Directory to store outbox files for asynchronous publishing.
cdcID Change Data Captures ID [database filename]
deliverPolicy Specifies the delivery policy for replication events. Options include all, last, etc. all
disableCDCSubscriber Disables the Change Data Capture (CDC) subscriber for replication. false
disableCDCPublisher Disables the Change Data Capture (CDC) publisher for replication. false
disableDBSnapshotter Disables the database snapshotter used for initial synchronization. false
disableDDLSync Disables the synchronization of DDL (Data Definition Language) changes across nodes. false
name Specifies the name of the node in the cluster.
natsConfigFile Path to the configuration file for the embedded NATS server. Overrides others NATS configurations.
natsName Sets the name of the embedded NATS server.
natsPort Configures the port for the embedded NATS server. 4222
natsStoreDir Directory to store data for the embedded NATS server.
publisherTimeout Timeout duration for publishing replication events. 5s
replicationStream Name of the replication stream used for synchronizing data.
replicationURL URL used for connecting to the replication stream.
replicas Number of replicas to maintain for high availability. 1
rowIdentify Row identify strategy: rowid or full rowid
snapshotInterval Interval for taking database snapshots. 1m
streamMaxAge Maximum age of messages in the replication stream before they are removed. 24h

Projects using go-ha

  • HA: Highly available leaderless SQLite cluster with HTTP and PostgreSQL Wire Protocol
  • PocketBase HA: Highly available leaderless PocketBase cluster

Contributing

Contributions are welcome! Please open an issue or submit a pull request.

License

This project is licensed under the MIT License. See the LICENSE file for details.

Documentation

Index

Constants

View Source
const (
	TypeExplain            = "EXPLAIN"
	TypeSelect             = "SELECT"
	TypeInsert             = "INSERT"
	TypeUpdate             = "UPDATE"
	TypeDelete             = "DELETE"
	TypeCreateTable        = "CREATE TABLE"
	TypeCreateIndex        = "CREATE INDEX"
	TypeCreateView         = "CREATE VIEW"
	TypeCreateTrigger      = "CREATE TRIGGER"
	TypeCreateVirtualTable = "CREATE VIRTUAL TABLE"
	TypeAlterTable         = "ALTER TABLE"
	TypeVacuum             = "VACUUM"
	TypeDrop               = "DROP"
	TypeAnalyze            = "ANALYZE"
	TypeBegin              = "BEGIN"
	TypeCommit             = "COMMIT"
	TypeRollback           = "ROLLBACK"
	TypeSavepoint          = "SAVEPOINT"
	TypeRelease            = "RELEASE"
	TypeOther              = "OTHER"
)
View Source
const DefaultStream = "ha_replication"

Variables

View Source
var (
	ErrSubscriberNotConfigured  = errors.New("subscriber not configured")
	ErrSnapshotterNotConfigured = errors.New("snapshotter not configured")
)
View Source
var (
	ErrInvalidSQL = fmt.Errorf("invalid SQL")
)

Functions

func LatestSnapshot added in v0.0.4

func LatestSnapshot(ctx context.Context, dsn string, options ...Option) (sequence uint64, reader io.ReadCloser, err error)

func Shutdown added in v0.0.16

func Shutdown()

Types

type AsyncNATSPublisher added in v0.0.15

type AsyncNATSPublisher struct {
	*NATSPublisher
	// contains filtered or unexported fields
}

func NewAsyncNATSPublisher added in v0.0.15

func NewAsyncNATSPublisher(nc *nats.Conn, subject string, timeout time.Duration, streamConfig *jetstream.StreamConfig, db *sql.DB) (*AsyncNATSPublisher, error)

func (*AsyncNATSPublisher) Close added in v0.0.15

func (p *AsyncNATSPublisher) Close() error

func (*AsyncNATSPublisher) Publish added in v0.0.15

func (p *AsyncNATSPublisher) Publish(cs *ChangeSet) error

type BackupFn added in v0.0.10

type BackupFn func(context.Context, *sql.DB, io.Writer) error

type CDCPublisher

type CDCPublisher interface {
	Publish(cs *ChangeSet) error
}

type CDCSubscriber added in v0.0.6

type CDCSubscriber interface {
	SetDB(*sql.DB)
	Start() error
	LatestSeq() uint64
	RemoveConsumer(ctx context.Context, name string) error
	DeliveredInfo(ctx context.Context, name string) (any, error)
}

type Change

type Change struct {
	Database  string   `json:"database,omitempty"`
	Table     string   `json:"table,omitempty"`
	Columns   []string `json:"columns,omitempty"`
	Operation string   `json:"operation"` // "INSERT", "UPDATE", "DELETE", "SQL", "CUSTOM"
	OldRowID  int64    `json:"old_rowid,omitempty"`
	NewRowID  int64    `json:"new_rowid,omitempty"`
	OldValues []any    `json:"old_values,omitempty"`
	NewValues []any    `json:"new_values,omitempty"`
	Command   string   `json:"command,omitempty"`
	Args      []any    `json:"args,omitempty"`
}

type ChangeSet

type ChangeSet struct {
	Node      string   `json:"node"`
	ProcessID int64    `json:"process_id"`
	Filename  string   `json:"filename"`
	Changes   []Change `json:"changes"`
	Timestamp int64    `json:"timestamp_ns"`
	Subject   string   `json:"-"`
	StreamSeq uint64   `json:"-"`
	// contains filtered or unexported fields
}

func NewChangeSet

func NewChangeSet(node string, filename string) *ChangeSet

func (*ChangeSet) AddChange

func (cs *ChangeSet) AddChange(change Change)

func (*ChangeSet) Apply

func (cs *ChangeSet) Apply(db *sql.DB) (err error)

func (*ChangeSet) Clear

func (cs *ChangeSet) Clear()

func (*ChangeSet) Send

func (cs *ChangeSet) Send(pub CDCPublisher) error

func (*ChangeSet) SetApplyStrategy added in v0.0.21

func (cs *ChangeSet) SetApplyStrategy(fn applyStrategyFn)

func (*ChangeSet) SetConnProvider added in v0.0.10

func (cs *ChangeSet) SetConnProvider(connProvider ConnHooksProvider)

func (*ChangeSet) SetInterceptor added in v0.0.6

func (cs *ChangeSet) SetInterceptor(interceptor ChangeSetInterceptor)

type ChangeSetInterceptor added in v0.0.6

type ChangeSetInterceptor interface {
	BeforeApply(*ChangeSet, *sql.Conn) (skip bool, err error)
	AfterApply(*ChangeSet, *sql.Conn, error) error
}

type ChangeSetSerializer added in v0.0.6

type ChangeSetSerializer func(*ChangeSet) ([]byte, error)

type ConnHooksFactory added in v0.0.10

type ConnHooksFactory func(nodeName string, filename string, disableDDLSync bool, publisher CDCPublisher) ConnHooksProvider

type ConnHooksProvider added in v0.0.10

type ConnHooksProvider interface {
	RegisterHooks(driver.Conn) (driver.Conn, error)
	DisableHooks(*sql.Conn) error
	EnableHooks(*sql.Conn) error
}

type Connector

type Connector struct {
	// contains filtered or unexported fields
}

func GetConnector added in v0.0.12

func GetConnector(dsn string) *Connector

func NewConnector

func NewConnector(dsn string, driver driver.Driver, connHooksFactory ConnHooksFactory, backupFn BackupFn, options ...Option) (*Connector, error)

func (*Connector) Close

func (c *Connector) Close()

func (*Connector) Connect

func (c *Connector) Connect(ctx context.Context) (driver.Conn, error)

func (*Connector) DeliveredInfo

func (c *Connector) DeliveredInfo(ctx context.Context, name string) (any, error)

func (*Connector) Driver

func (c *Connector) Driver() driver.Driver

func (*Connector) LatestSeq

func (c *Connector) LatestSeq() uint64

func (*Connector) LatestSnapshot

func (c *Connector) LatestSnapshot(ctx context.Context) (uint64, io.ReadCloser, error)

func (*Connector) NodeName added in v0.0.10

func (c *Connector) NodeName() string

func (*Connector) Publisher added in v0.0.10

func (c *Connector) Publisher() CDCPublisher

func (*Connector) RemoveConsumer

func (c *Connector) RemoveConsumer(ctx context.Context, name string) error

func (*Connector) Snapshotter added in v0.0.13

func (c *Connector) Snapshotter() DBSnapshotter

func (*Connector) Subscriber added in v0.0.13

func (c *Connector) Subscriber() CDCSubscriber

func (*Connector) TakeSnapshot

func (c *Connector) TakeSnapshot(ctx context.Context) (sequence uint64, err error)

type DBSnapshotter added in v0.0.6

type DBSnapshotter interface {
	SetDB(*sql.DB)
	Start()
	TakeSnapshot(ctx context.Context) (sequence uint64, err error)
	LatestSnapshot(ctx context.Context) (sequence uint64, reader io.ReadCloser, err error)
}

type DriverProvider added in v0.0.10

type DriverProvider interface {
	driver.Driver
	ConnWithoutHooks() (*sql.Conn, error)
	EnableHooks(conn *sql.Conn)
	OnConnect(c driver.Conn) (driver.Conn, error)
}

type EmbeddedNatsConfig

type EmbeddedNatsConfig struct {
	Name       string
	Port       int
	StoreDir   string
	User       string
	Pass       string
	File       string
	EnableLogs bool
}

type JSONPublisher added in v0.0.6

type JSONPublisher struct {
	// contains filtered or unexported fields
}

func NewJSONPublisher added in v0.0.6

func NewJSONPublisher(w io.Writer) *JSONPublisher

func (*JSONPublisher) Publish added in v0.0.6

func (p *JSONPublisher) Publish(cs *ChangeSet) error

type NATSPublisher added in v0.0.6

type NATSPublisher struct {
	// contains filtered or unexported fields
}

func NewNATSPublisher added in v0.0.6

func NewNATSPublisher(nc *nats.Conn, subject string, timeout time.Duration, streamConfig *jetstream.StreamConfig) (*NATSPublisher, error)

func (*NATSPublisher) Publish added in v0.0.6

func (p *NATSPublisher) Publish(cs *ChangeSet) error

type NATSSnapshotter added in v0.0.6

type NATSSnapshotter struct {
	// contains filtered or unexported fields
}

func NewNATSSnapshotter added in v0.0.6

func NewNATSSnapshotter(ctx context.Context, nc *nats.Conn, replicas int, stream string, db *sql.DB, backupFn BackupFn, interval time.Duration, sequenceProvider SequenceProvider, objectName string) (*NATSSnapshotter, error)

func (*NATSSnapshotter) LatestSnapshot added in v0.0.6

func (s *NATSSnapshotter) LatestSnapshot(ctx context.Context) (uint64, io.ReadCloser, error)

func (*NATSSnapshotter) LatestSnapshotSequence added in v0.0.6

func (s *NATSSnapshotter) LatestSnapshotSequence(ctx context.Context) (uint64, error)

func (*NATSSnapshotter) SetDB added in v0.0.13

func (s *NATSSnapshotter) SetDB(db *sql.DB)

func (*NATSSnapshotter) Start added in v0.0.13

func (s *NATSSnapshotter) Start()

func (*NATSSnapshotter) TakeSnapshot added in v0.0.6

func (s *NATSSnapshotter) TakeSnapshot(ctx context.Context) (sequence uint64, err error)

type NATSSubscriber added in v0.0.6

type NATSSubscriber struct {
	// contains filtered or unexported fields
}

func NewNATSSubscriber added in v0.0.6

func NewNATSSubscriber(cfg NATSSubscriberConfig) (*NATSSubscriber, error)

func (*NATSSubscriber) DeliveredInfo added in v0.0.6

func (s *NATSSubscriber) DeliveredInfo(ctx context.Context, name string) (any, error)

func (*NATSSubscriber) LatestSeq added in v0.0.6

func (s *NATSSubscriber) LatestSeq() uint64

func (*NATSSubscriber) RemoveConsumer added in v0.0.6

func (s *NATSSubscriber) RemoveConsumer(ctx context.Context, name string) error

func (*NATSSubscriber) SetDB added in v0.0.13

func (s *NATSSubscriber) SetDB(db *sql.DB)

func (*NATSSubscriber) Start added in v0.0.6

func (s *NATSSubscriber) Start() error

type NATSSubscriberConfig added in v0.0.21

type NATSSubscriberConfig struct {
	Node         string
	Durable      string
	NatsConn     *nats.Conn
	Stream       string
	Subject      string
	Policy       string
	DB           *sql.DB
	ConnProvider ConnHooksProvider
	Interceptor  ChangeSetInterceptor
	RowIdentify  RowIdentify
}

type NoopPublisher added in v0.0.6

type NoopPublisher struct{}

func NewNoopPublisher added in v0.0.6

func NewNoopPublisher() *NoopPublisher

func (*NoopPublisher) Publish added in v0.0.6

func (p *NoopPublisher) Publish(cs *ChangeSet) error

type NoopSnapshotter added in v0.0.6

type NoopSnapshotter struct {
	// contains filtered or unexported fields
}

func NewNoopSnapshotter added in v0.0.6

func NewNoopSnapshotter() *NoopSnapshotter

func (*NoopSnapshotter) LatestSnapshot added in v0.0.6

func (s *NoopSnapshotter) LatestSnapshot(ctx context.Context) (uint64, io.ReadCloser, error)

func (*NoopSnapshotter) SetDB added in v0.0.13

func (s *NoopSnapshotter) SetDB(_ *sql.DB)

func (*NoopSnapshotter) Start added in v0.0.13

func (s *NoopSnapshotter) Start()

func (*NoopSnapshotter) TakeSnapshot added in v0.0.6

func (s *NoopSnapshotter) TakeSnapshot(ctx context.Context) (sequence uint64, err error)

type NoopSubscriber added in v0.0.6

type NoopSubscriber struct{}

func NewNoopSubscriber added in v0.0.6

func NewNoopSubscriber() *NoopSubscriber

func (*NoopSubscriber) DeliveredInfo added in v0.0.6

func (*NoopSubscriber) DeliveredInfo(ctx context.Context, name string) (any, error)

func (*NoopSubscriber) LatestSeq added in v0.0.6

func (*NoopSubscriber) LatestSeq() uint64

func (*NoopSubscriber) RemoveConsumer added in v0.0.6

func (*NoopSubscriber) RemoveConsumer(ctx context.Context, name string) error

func (*NoopSubscriber) SetDB added in v0.0.13

func (*NoopSubscriber) SetDB(db *sql.DB)

func (*NoopSubscriber) Start added in v0.0.6

func (*NoopSubscriber) Start() error

type Option

type Option func(*Connector)

func NameToOptions added in v0.0.10

func NameToOptions(name string) (string, []Option, error)

func WithAsyncPublisher added in v0.0.15

func WithAsyncPublisher() Option

func WithAsyncPublisherOutboxDir added in v0.0.15

func WithAsyncPublisherOutboxDir(dir string) Option

func WithCDCID added in v0.0.18

func WithCDCID(id string) Option

func WithCDCPublisher

func WithCDCPublisher(pub CDCPublisher) Option

func WithCDCSubscriber added in v0.0.6

func WithCDCSubscriber(sub CDCSubscriber) Option

func WithChangeSetInterceptor added in v0.0.6

func WithChangeSetInterceptor(interceptor ChangeSetInterceptor) Option

func WithDBSnapshotter added in v0.0.6

func WithDBSnapshotter(snap DBSnapshotter) Option

func WithDeliverPolicy

func WithDeliverPolicy(deliverPolicy string) Option

func WithDisableDDLSync added in v0.0.4

func WithDisableDDLSync() Option

func WithEmbeddedNatsConfig

func WithEmbeddedNatsConfig(cfg *EmbeddedNatsConfig) Option

func WithExtensions

func WithExtensions(extensions ...string) Option

func WithName

func WithName(name string) Option

func WithNatsOptions

func WithNatsOptions(options ...nats.Option) Option

func WithPublisherTimeout

func WithPublisherTimeout(timeout time.Duration) Option

func WithReplicas

func WithReplicas(replicas int) Option

func WithReplicationStream added in v0.0.9

func WithReplicationStream(stream string) Option

func WithReplicationURL

func WithReplicationURL(url string) Option

func WithRowIdentify added in v0.0.21

func WithRowIdentify(i RowIdentify) Option

func WithSnapshotInterval

func WithSnapshotInterval(interval time.Duration) Option

func WithStreamMaxAge

func WithStreamMaxAge(maxAge time.Duration) Option

func WithWaitFor added in v0.0.4

func WithWaitFor(ch chan struct{}) Option

type RowIdentify added in v0.0.21

type RowIdentify string
const (
	Rowid RowIdentify = "rowid"
	Full  RowIdentify = "full"
)

type SequenceProvider

type SequenceProvider interface {
	LatestSeq() uint64
}

type Statement

type Statement struct {
	// contains filtered or unexported fields
}

func Parse

func Parse(ctx context.Context, sql string) ([]*Statement, error)

func ParseStatement added in v0.0.4

func ParseStatement(ctx context.Context, sql string) (*Statement, error)

func UnverifiedStatement added in v0.0.12

func UnverifiedStatement(source string, hasDistinct bool,
	hasReturning bool, typ string, parameters []string, columns []string,
	ddl bool, hasIfExists bool, hasModifier bool,
	modifiesDatabase bool) *Statement

func (*Statement) Begin

func (s *Statement) Begin() bool

func (*Statement) Columns

func (s *Statement) Columns() []string

func (*Statement) Commit

func (s *Statement) Commit() bool

func (*Statement) DDL

func (s *Statement) DDL() bool

func (*Statement) HasDistinct

func (s *Statement) HasDistinct() bool

func (*Statement) HasReturning

func (s *Statement) HasReturning() bool

func (*Statement) IsCreateTable

func (s *Statement) IsCreateTable() bool

func (*Statement) IsDelete

func (s *Statement) IsDelete() bool

func (*Statement) IsExplain

func (s *Statement) IsExplain() bool

func (*Statement) IsInsert

func (s *Statement) IsInsert() bool

func (*Statement) IsSelect

func (s *Statement) IsSelect() bool

func (*Statement) IsUpdate

func (s *Statement) IsUpdate() bool

func (*Statement) ModifiesDatabase added in v0.0.12

func (s *Statement) ModifiesDatabase() bool

func (*Statement) Parameters

func (s *Statement) Parameters() []string

func (*Statement) Rollback

func (s *Statement) Rollback() bool

func (*Statement) Source

func (s *Statement) Source() string

func (*Statement) SourceWithIfExists added in v0.0.10

func (s *Statement) SourceWithIfExists() string

func (*Statement) Type

func (s *Statement) Type() string

type WriterPublisher added in v0.0.6

type WriterPublisher struct {
	// contains filtered or unexported fields
}

func NewWriterPublisher added in v0.0.6

func NewWriterPublisher(w io.Writer, serializer ChangeSetSerializer) *WriterPublisher

func (*WriterPublisher) Publish added in v0.0.6

func (p *WriterPublisher) Publish(cs *ChangeSet) error

Directories

Path Synopsis
_examples
node1 command
node2 command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL