ha

package module
v0.0.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 10, 2025 License: Apache-2.0 Imports: 24 Imported by: 6

README

go-ha

Go database/sql driver

Documentation

Index

Constants

View Source
const (
	TypeExplain            = "EXPLAIN"
	TypeSelect             = "SELECT"
	TypeInsert             = "INSERT"
	TypeUpdate             = "UPDATE"
	TypeDelete             = "DELETE"
	TypeCreateTable        = "CREATE TABLE"
	TypeCreateIndex        = "CREATE INDEX"
	TypeCreateView         = "CREATE VIEW"
	TypeCreateTrigger      = "CREATE TRIGGER"
	TypeCreateVirtualTable = "CREATE VIRTUAL TABLE"
	TypeAlterTable         = "ALTER TABLE"
	TypeVacuum             = "VACUUM"
	TypeDrop               = "DROP"
	TypeAnalyze            = "ANALYZE"
	TypeBegin              = "BEGIN"
	TypeCommit             = "COMMIT"
	TypeRollback           = "ROLLBACK"
	TypeSavepoint          = "SAVEPOINT"
	TypeRelease            = "RELEASE"
	TypeOther              = "OTHER"
)
View Source
const DefaultStream = "ha_replication"

Variables

View Source
var (
	ErrInvalidSQL = fmt.Errorf("invalid SQL")
)
View Source
var ErrNatsNotConfigured = errors.New("NATS not configured")

Functions

func Backup

func Backup(ctx context.Context, db *sql.DB, w io.Writer) error

func LatestSnapshot added in v0.0.4

func LatestSnapshot(ctx context.Context, options ...Option) (sequence uint64, reader io.ReadCloser, err error)

Types

type CDCPublisher

type CDCPublisher interface {
	Publish(cs *ChangeSet) error
}

type Change

type Change struct {
	Database  string   `json:"database,omitempty"`
	Table     string   `json:"table,omitempty"`
	Columns   []string `json:"columns,omitempty"`
	Operation string   `json:"operation"` // "INSERT", "UPDATE", "DELETE", "SQL"
	OldRowID  int64    `json:"old_rowid,omitempty"`
	NewRowID  int64    `json:"new_rowid,omitempty"`
	OldValues []any    `json:"old_values,omitempty"`
	NewValues []any    `json:"new_values,omitempty"`
	SQL       string   `json:"sql,omitempty"`
	SQLArgs   []any    `json:"sql_args,omitempty"`
}

type ChangeSet

type ChangeSet struct {
	Node      string   `json:"node"`
	ProcessID int64    `json:"process_id"`
	Filename  string   `json:"filename"`
	Changes   []Change `json:"changes"`
	Timestamp int64    `json:"timestamp_ns"`
	StreamSeq uint64   `json:"-"`
	// contains filtered or unexported fields
}

func NewChangeSet

func NewChangeSet(node string, filename string, publisher CDCPublisher) *ChangeSet

func (*ChangeSet) AddChange

func (cs *ChangeSet) AddChange(change Change)

func (*ChangeSet) Apply

func (cs *ChangeSet) Apply(db *sql.DB) error

func (*ChangeSet) Clear

func (cs *ChangeSet) Clear()

func (*ChangeSet) Send

func (cs *ChangeSet) Send(pub CDCPublisher) error

type Conn

type Conn struct {
	*sqlite3.SQLiteConn
	// contains filtered or unexported fields
}

func (*Conn) Exec

func (c *Conn) Exec(query string, args []driver.Value) (driver.Result, error)

func (*Conn) ExecContext

func (c *Conn) ExecContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Result, error)

type ConnectHookFn added in v0.0.2

type ConnectHookFn func(conn *sqlite3.SQLiteConn) error

type Connector

type Connector struct {
	// contains filtered or unexported fields
}

func NewConnector

func NewConnector(dsn string, options ...Option) (*Connector, error)

func (*Connector) Close

func (c *Connector) Close()

func (*Connector) Connect

func (c *Connector) Connect(ctx context.Context) (driver.Conn, error)

func (*Connector) DeliveredInfo

func (c *Connector) DeliveredInfo(ctx context.Context, name string) ([]*jetstream.ConsumerInfo, error)

func (*Connector) Driver

func (c *Connector) Driver() driver.Driver

func (*Connector) LatestSeq

func (c *Connector) LatestSeq() uint64

func (*Connector) LatestSnapshot

func (c *Connector) LatestSnapshot(ctx context.Context) (uint64, io.ReadCloser, error)

func (*Connector) RemoveConsumer

func (c *Connector) RemoveConsumer(ctx context.Context, name string) error

func (*Connector) TakeSnapshot

func (c *Connector) TakeSnapshot(ctx context.Context, db *sql.DB) (sequence uint64, err error)

type Driver added in v0.0.2

type Driver struct {
	Extensions  []string
	ConnectHook ConnectHookFn
	Options     []Option
}

func (*Driver) Open added in v0.0.2

func (d *Driver) Open(name string) (driver.Conn, error)

func (*Driver) OpenConnector added in v0.0.2

func (d *Driver) OpenConnector(name string) (driver.Connector, error)

type EmbeddedNatsConfig

type EmbeddedNatsConfig struct {
	Name       string
	Port       int
	StoreDir   string
	User       string
	Pass       string
	File       string
	EnableLogs bool
}

type Option

type Option func(*Connector)

func WithCDCPublisher

func WithCDCPublisher(pub CDCPublisher) Option

func WithConnectHook added in v0.0.2

func WithConnectHook(fn ConnectHookFn) Option

func WithDeliverPolicy

func WithDeliverPolicy(deliverPolicy string) Option

func WithDisableDDLSync added in v0.0.4

func WithDisableDDLSync() Option

func WithEmbeddedNatsConfig

func WithEmbeddedNatsConfig(cfg *EmbeddedNatsConfig) Option

func WithExtensions

func WithExtensions(extensions ...string) Option

func WithName

func WithName(name string) Option

func WithNatsOptions

func WithNatsOptions(options ...nats.Option) Option

func WithPublisherTimeout

func WithPublisherTimeout(timeout time.Duration) Option

func WithReplicas

func WithReplicas(replicas int) Option

func WithReplicationSubject

func WithReplicationSubject(subject string) Option

func WithReplicationURL

func WithReplicationURL(url string) Option

func WithSnapshotInterval

func WithSnapshotInterval(interval time.Duration) Option

func WithStreamMaxAge

func WithStreamMaxAge(maxAge time.Duration) Option

func WithWaitFor added in v0.0.4

func WithWaitFor(ch chan struct{}) Option

type SequenceProvider

type SequenceProvider interface {
	LatestSeq() uint64
}

type Statement

type Statement struct {
	// contains filtered or unexported fields
}

func Parse

func Parse(ctx context.Context, sql string) ([]*Statement, error)

func ParseStatement added in v0.0.4

func ParseStatement(ctx context.Context, sql string) (*Statement, error)

func (*Statement) Begin

func (s *Statement) Begin() bool

func (*Statement) Columns

func (s *Statement) Columns() []string

func (*Statement) Commit

func (s *Statement) Commit() bool

func (*Statement) DDL

func (s *Statement) DDL() bool

func (*Statement) HasDistinct

func (s *Statement) HasDistinct() bool

func (*Statement) HasReturning

func (s *Statement) HasReturning() bool

func (*Statement) IsCreateTable

func (s *Statement) IsCreateTable() bool

func (*Statement) IsDelete

func (s *Statement) IsDelete() bool

func (*Statement) IsExplain

func (s *Statement) IsExplain() bool

func (*Statement) IsInsert

func (s *Statement) IsInsert() bool

func (*Statement) IsSelect

func (s *Statement) IsSelect() bool

func (*Statement) IsUpdate

func (s *Statement) IsUpdate() bool

func (*Statement) Parameters

func (s *Statement) Parameters() []string

func (*Statement) Rollback

func (s *Statement) Rollback() bool

func (*Statement) Source

func (s *Statement) Source() string

func (*Statement) Type

func (s *Statement) Type() string

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL