dbresolver

package module
v0.0.0-...-d246b70 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 15, 2024 License: MIT Imports: 13 Imported by: 0

README

DBResolver

Switch between read and write databases. Coalesce requests.

Repo

Quick Start

Importing
go get github.com/go-batteries/dbresolver@latest 
Usage
import (
  "github.com/go-batteries/dbresolver"
  _ "github.com/mattn/go-sqlite3"
)

func setDatabaseDefaults(db *sql.DB) {
	db.SetMaxIdleConns(MAX_IDLE_CONNECTIONS)
	db.SetConnMaxLifetime(CONN_MAX_LIFETIME)
	db.SetMaxOpenConns(MAX_OPEN_CONNECTIONS)
	db.LogMode(true)
}

func Setup() *dbresolver.Database {
    masterDB, err := sql.Open("sqlite3", "./testdbs/users_write.db")
    if err != nil {
      log.Fatal("failed to connect to db", err)
    }

    setDatabaseDefaults(masterDB)

    replicaDBs := []*dbresolver.ResolverDB{}
    
    replica, err := sql.Open("sqlite3", "./testdbs/users_read_a.db")
    if err != nil {
      log.Fatal("failed to connect to db", err)
    }

    setDatabaseDefaults(replica)
    replicaDBs = append(replicaDBs, dbresolver.AsSyncReplica(replica, "users_read_replica1"))

    replica, err = sql.Open("sqlite3", "./testdbs/users_read_b.db")
    if err != nil {
      log.Fatal("failed to connect to db", err)
    }

    setDatabaseDefaults(replica)
    replicaDBs = append(replicaDBs, dbresolver.AsReplica(replica, "users_read_replica2"))


    return dbresolver.Register(dbresolver.DBConfig{
        Master:   dbresolver.AsMaster(masterDB, "users_wrtie"),
        Replicas: replicaDBs,
        // Policy: &dbresolver.RoundRobalancer{},
        // for existing database integration, default mode to write
        // DefaultMode: &dbresolver.DbWriteMode,
        // MaxIdleConnections: dbresolver.ToPtr(30),
        // MaxOpenConnections: ,
        // ConnectionMaxLifeTime: ,
    })
}

db := Setup()

db.QueryRow(`SELECT * FROM users`)
Switching data source

It is possible to provide the option to use read or write forcefully.

// Use write db
db.WithMode(dbresolver.DBWriteMode).QueryRow(`SELECT * FROM users`)

// Use read db
db.WithMode(dbresolver.DBReadMode).Exec(`DELETE FROM users`) // error
db.WithMode(dbresolver.DBWriteMode).Exec(`DELETE FROM users`) // works

// if you need to work with DML queries on read instance
// use the sql.DB object

replicaDB.DB.Exec(`DELETE FROM users`);

It is also possible to set the default mode to read/write mode.

dbresolver.DBConfig {
    DefaultMode: &dbresolver.DbWriteMode
}
Load Balancing

By default we have two balancers

// RandomBalancer
dbresolver.DBConfig{
    Policy: &dbresolver.RoundRobalancer{}
}

// RandomBalancer
dbresolver.DBConfig{
    Policy: &dbresolver.RandomBalancer{}
}

You can provide your own load balancer. The Balancer interface is defined as such

type Balancer interface {
    Get() int64
}

Documentation

Index

Constants

View Source
const (
	DEFAULT_MAX_IDLE_CONNECTIONS = 30
	DEFAULT_CONN_MAX_LIFETIME    = 10 * time.Minute
	DEFAULT_MAX_OPEN_CONNECTIONS = 10
)

Variables

View Source
var (
	ErrKoalesceTimeout   = errors.New("timeout")
	ErrKoalesceCancelled = errors.New("cancelled")
)
View Source
var (
	EventBeforeDBSelect string = "before::select_db"
	EventAfterDBSelect  string = "after:select_db"
	EventBeforeQueryRun string = "before::query_run"
)
View Source
var (
	ErrorInvalidDBMode = errors.New("db mode invalid for query")
	ErrorInvalidData   = errors.New("unexpected result type from query koalescer")
)

Functions

func HashValues

func HashValues(values ...interface{}) string

func SetConfigDefaults

func SetConfigDefaults(db *sql.DB, config *DBConfig)

func SetDBDefaults

func SetDBDefaults(db *sql.DB)

func ToKey

func ToKey(stmt string, values ...interface{}) string

func ToPtr

func ToPtr[E any](e E) *E

Types

type Balancer

type Balancer interface {
	Get() int64
}

type DBConfig

type DBConfig struct {
	Master                *ResolverDB
	Replicas              []*ResolverDB
	Policy                Balancer
	DefaultMode           *DbActionMode
	MaxIdleConnections    *int
	MaxOpenConnections    *int
	ConnectionMaxLifetime *time.Duration
}

func (*DBConfig) CheckHealth

func (cfg *DBConfig) CheckHealth(ctx context.Context) error

Optional CheckHealth to validate database connection health

type DataBaseOpts

type DataBaseOpts func(d *Database)

func WithHooks

func WithHooks(eventHandler hooks.EventEmitter) DataBaseOpts

func WithQueryQualescer

func WithQueryQualescer(koalescer *QueryKoalescer) DataBaseOpts

type Database

type Database struct {
	Config DBConfig
	Hooks  hooks.EventEmitter
	// contains filtered or unexported fields
}

func Register

func Register(config DBConfig, opts ...DataBaseOpts) *Database

func (*Database) Exec

func (d *Database) Exec(stmt string, values ...interface{}) (sql.Result, error)

func (*Database) ExecContext

func (d *Database) ExecContext(ctx context.Context, stmt string, values ...interface{}) (sql.Result, error)

func (*Database) Query

func (d *Database) Query(stmt string, values ...interface{}) (Rows, error)

func (*Database) QueryContext

func (d *Database) QueryContext(ctx context.Context, stmt string, values ...interface{}) (Rows, error)

func (*Database) QueryRow

func (d *Database) QueryRow(stmt string, values ...interface{}) (*Row, error)

func (*Database) QueryRowContext

func (d *Database) QueryRowContext(ctx context.Context, stmt string, values ...interface{}) (*Row, error)

func (*Database) WithMode

func (d *Database) WithMode(dbMode DbActionMode) *Database

type DbActionMode

type DbActionMode string
var (
	DbWriteMode DbActionMode = "write"
	DbReadMode  DbActionMode = "read"
)

type KoalesceEvictor

type KoalesceEvictor interface {
	HasEvicted() bool
}

type NoopEvictor

type NoopEvictor struct{}

func (*NoopEvictor) HasEvicted

func (*NoopEvictor) HasEvicted() bool

type QueryKoalescer

type QueryKoalescer struct {
	// contains filtered or unexported fields
}

func NewKoalescer

func NewKoalescer(evictor KoalesceEvictor) *QueryKoalescer

func (*QueryKoalescer) DoChan

func (ko *QueryKoalescer) DoChan(query string, fn func() (interface{}, error)) <-chan singleflight.Result

func (*QueryKoalescer) DoWithContext

func (ko *QueryKoalescer) DoWithContext(ctx context.Context, query string, fn func() (interface{}, error)) <-chan singleflight.Result

func (*QueryKoalescer) Evict

func (ko *QueryKoalescer) Evict(query string) bool

func (*QueryKoalescer) Forget

func (ko *QueryKoalescer) Forget(query string) error

func (*QueryKoalescer) ForgetWithContext

func (ko *QueryKoalescer) ForgetWithContext(ctx context.Context, query string) error

type RandomBalancer

type RandomBalancer struct {
	// contains filtered or unexported fields
}

func NewRandomBalancer

func NewRandomBalancer(resourceCount int) *RandomBalancer

func (*RandomBalancer) Get

func (r *RandomBalancer) Get() int64

type ResolverDB

type ResolverDB struct {
	*sql.DB

	Name     string
	IsMaster bool
	InSync   bool
	// contains filtered or unexported fields
}

func AsMaster

func AsMaster(db *sql.DB, name string) *ResolverDB

func AsReplica

func AsReplica(db *sql.DB, name string) *ResolverDB

func AsSyncReplica

func AsSyncReplica(db *sql.DB, name string) *ResolverDB

func NewResolveDB

func NewResolveDB(db *sql.DB, name string, isMaster, inSync bool) *ResolverDB

func (*ResolverDB) CheckHealth

func (rd *ResolverDB) CheckHealth(ctx context.Context) error

func (*ResolverDB) UnWrap

func (rd *ResolverDB) UnWrap() *sql.DB

type RoundRobalancer

type RoundRobalancer struct {
	// contains filtered or unexported fields
}

func NewRoundRobalancer

func NewRoundRobalancer(resourceCount int) *RoundRobalancer

func (*RoundRobalancer) Get

func (r *RoundRobalancer) Get() int64

type Row

type Row []interface{}

func ToRow

func ToRow(rows *sql.Rows) (*Row, error)

type Rows

type Rows []*Row

func ToRows

func ToRows(rows *sql.Rows) (Rows, error)

type TimeEvictor

type TimeEvictor struct {
	// contains filtered or unexported fields
}

func NewTimeEvictor

func NewTimeEvictor(duration time.Duration) *TimeEvictor

func (*TimeEvictor) HasEvicted

func (t *TimeEvictor) HasEvicted() bool

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL