mysql

package module
v0.0.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 25, 2025 License: MIT Imports: 34 Imported by: 3

README

cool-mysql

Go Reference license

cool-mysql is a small library that wraps Go's database/sql with MySQL oriented helpers. It keeps the underlying interfaces intact while providing conveniences that save you time when writing data access code.

Features

  • Dual pools for reads and writes
  • Named template parameters using @@name tokens
  • Automatic retries with exponential backoff
  • Pluggable caching (Redis, Memcached, or in-memory weak pointers) with optional distributed locks
  • Insert/Upsert helpers that chunk large sets to respect max_allowed_packet
  • Go template syntax in queries for conditional logic
  • Flexible selection into structs, slices, maps, channels or functions
  • Select single values (e.g. string, time.Time)
  • JSON columns can unmarshal directly into struct fields
  • Channels supported for selecting and inserting
  • Optional query logging and transaction helpers
  • Pluggable logging using log/slog by default with a Zap adapter

Installation

go get github.com/StirlingMarketingGroup/cool-mysql

Quick Start

package main

import (
    "log"
    "time"

    mysql "github.com/StirlingMarketingGroup/cool-mysql"
)

type User struct {
    ID   int    `mysql:"id"`
    Name string `mysql:"name"`
}

func main() {
    db, err := mysql.New(
        "writeUser", "writePass", "mydb", "127.0.0.1", 3306,
        "readUser", "readPass", "mydb", "127.0.0.1", 3306,
        "utf8mb4_unicode_ci", time.Local,
    )
    if err != nil {
        log.Fatal(err)
    }

    var users []User
    err = db.Select(&users,
        "SELECT id, name FROM users WHERE created_at > @@since",
        time.Minute, // cache TTL when caching is configured
        mysql.Params{"since": time.Now().Add(-24 * time.Hour)},
    )
    if err != nil {
        log.Fatal(err)
    }

    log.Printf("loaded %d users", len(users))
}

Configuration

cool-mysql can be configured using environment variables:

Variable Default Description
COOL_MAX_EXECUTION_TIME_TIME 27 (seconds) Maximum query execution time (90% of 30 seconds)
COOL_REDIS_LOCK_RETRY_DELAY 0.020 (seconds) Delay between Redis lock retry attempts
COOL_MYSQL_MAX_QUERY_LOG_LENGTH 4096 (bytes) Maximum length of queries in error logs

Example:

export COOL_MAX_EXECUTION_TIME_TIME=60  # 60 second timeout
export COOL_REDIS_LOCK_RETRY_DELAY=0.050  # 50ms retry delay
export COOL_MYSQL_MAX_QUERY_LOG_LENGTH=8192  # 8KB log limit
Enabling caching
// use Redis
r := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
db.EnableRedis(r)

// or Memcached
db.EnableMemcache(memcache.New("localhost:11211"))

// or a simple in-memory cache using weak pointers
db.UseCache(mysql.NewWeakCache())

// caches can be stacked
db.UseCache(mysql.NewMultiCache(mysql.NewWeakCache(), mysql.NewRedisCache(r)))

Usage

Selecting into structs
type Profile struct {
    Likes []string `json:"likes"`
}

type User struct {
    ID      int
    Name    string
    Profile Profile `db:"profile_json"`
}

var u User
err := db.Select(&u,
    "SELECT id, name, profile_json FROM users WHERE id=@@id",
    0,
    mysql.Params{"id": 1},
)
if err != nil {
    // if no row is returned, err == sql.ErrNoRows
    log.Fatal(err)
}

Selecting into a slice never returns sql.ErrNoRows if empty:

var all []User
err := db.Select(&all, "SELECT * FROM users WHERE active=1", 0)
if err != nil {
    log.Fatal(err)
}
log.Println(len(all))
Selecting into single values
var name string
err := db.Select(&name, "SELECT name FROM users WHERE id=@@id", 0, 5) // single param value
Selecting into channels
userCh := make(chan User)
go func() {
    defer close(userCh)
    if err := db.Select(userCh, "SELECT id, name FROM users", 0); err != nil {
        log.Fatal(err)
    }
}()
for u := range userCh {
    log.Printf("%d: %s", u.ID, u.Name)
}
Selecting with a function receiver
err = db.Select(func(u User) {
    log.Printf("found %s", u.Name)
}, "SELECT id, name FROM users WHERE active=1", 0)
Additional query methods

Count records efficiently:

count, err := db.Count("SELECT COUNT(*) FROM users WHERE active = @@active", 0, mysql.Params{"active": 1})

Check existence:

exists, err := db.Exists("SELECT 1 FROM users WHERE email = @@email", 0, mysql.Params{"email": "user@example.com"})
// Use ExistsWrites() to query the write connection
existsOnWrite, err := db.ExistsWrites("SELECT 1 FROM users WHERE email = @@email", mysql.Params{"email": "user@example.com"})

Query against write connection:

var users []User
err := db.SelectWrites(&users, "SELECT id, name FROM users WHERE id = @@id", mysql.Params{"id": 123})

Direct JSON results:

var result json.RawMessage
err := db.SelectJSON(&result, "SELECT JSON_OBJECT('id', id, 'name', name) FROM users WHERE id = @@id", 0, mysql.Params{"id": 123})

Execute with detailed results:

result, err := db.ExecResult("UPDATE users SET name = @@name WHERE id = @@id", mysql.Params{"name": "Alice", "id": 123})
if err != nil {
    log.Fatal(err)
}
rowsAffected, _ := result.RowsAffected()
lastInsertID, _ := result.LastInsertId()

Context-aware operations: All major functions have Context variants (SelectContext, InsertContext, UpsertContext, etc.) for cancellation and timeout support:

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

var users []User
err := db.SelectContext(ctx, &users, "SELECT id, name FROM users", 0)

Raw SQL strings: Use mysql.Raw for literal SQL that shouldn't be escaped:

err := db.Select(&users, 
    "SELECT id, name FROM users WHERE created_at > @@date AND @@condition", 
    0, 
    mysql.Params{
        "date": time.Now().Add(-24*time.Hour),
        "condition": mysql.Raw("status = 'active'"), // not escaped
    },
)
Conditional queries with templates
var since *time.Time
query := `SELECT id, name FROM users WHERE 1=1 {{ if .since }}AND created_at > @@since{{ end }}`
err = db.Select(&users, query, 0, mysql.Params{"since": since})
Insert helper
newUser := User{ID: 123, Name: "Alice"}
err = db.Insert("users", newUser) // query is built automatically

The source can also be a channel of structs for batch inserts.

ch := make(chan User)
go func() {
    for _, u := range users {
        ch <- u
    }
    close(ch)
}()
if err := db.Insert("users", ch); err != nil { // batch insert
    log.Fatal(err)
}
Upsert helper
up := User{ID: 123, Name: "Alice"}
err = db.Upsert(
    "users",            // table name only
    []string{"id"},    // unique columns
    []string{"name"},  // columns to update on conflict
    "",                // additional WHERE clause
    up,
)
Struct tags

Fields in a struct can include a mysql tag to control how they map to the database. The tag name overrides the column name used by the insert and upsert helpers and when scanning query results.

Available options:

  • defaultzero – write default(column_name) instead of the zero value during inserts and parameter interpolation
  • insertDefault – alias for defaultzero (same behavior)
  • omitempty – alias for defaultzero (same behavior)
  • "-" – skip this field entirely (not included in inserts, selects, or parameter interpolation)

Hex encoding support: Column names can include hex-encoded characters using 0x notation (e.g., 0x2c for comma, 0x20 for space).

type Person struct {
    ID       int       `mysql:"id"`
    Name     string    `mysql:"name,defaultzero"`
    Email    string    `mysql:"email,omitempty"`        // same as defaultzero
    Internal string    `mysql:"-"`                      // completely ignored
    Created  time.Time `mysql:"created_at,insertDefault"` // same as defaultzero
    Special  string    `mysql:"column0x2cname"`         // becomes "column,name"
}

db.Insert("people", Person{}) 
// name, email, created_at become default(`name`), default(`email`), default(`created_at`)
// Internal field is completely ignored

_, _, _ = mysql.InterpolateParams(
    "SELECT * FROM people WHERE name = @@Name",
    Person{},
) // produces: SELECT * FROM people WHERE name = default(`name`)

tmpl := `SELECT * FROM people {{ if .Name }}WHERE name=@@Name{{ end }}`

Important notes:

  • When using template syntax, the struct field name (.Name above) is used for lookups, not the column name from the mysql tag
  • All three options (defaultzero, insertDefault, omitempty) have identical behavior
  • The "-" option completely excludes the field from all database operations
Transactions
tx, commit, cancel, err := mysql.GetOrCreateTxFromContext(ctx)
defer cancel()
if err != nil {
    return fmt.Errorf("failed to create transaction: %w", err)
}
ctx = mysql.NewContextWithTx(ctx, tx)

// do DB work with tx in context

if err := commit(); err != nil {
    return fmt.Errorf("failed to commit tx: %w", err)
}

Advanced Features

Context Management

cool-mysql provides utilities for managing database connections and transactions through context:

// Create a new context with a database connection
ctx := mysql.NewContext(context.Background(), db)

// Retrieve the database from context
dbFromCtx := mysql.FromContext(ctx)

// Transaction management with context
tx, commit, cancel, err := mysql.GetOrCreateTxFromContext(ctx)
if err != nil {
    return err
}
defer cancel()

// Use the transaction
ctx = mysql.NewContextWithTx(ctx, tx)
err = db.SelectContext(ctx, &users, "SELECT * FROM users WHERE active = 1", 0)
if err != nil {
    return err
}

// Commit the transaction
if err := commit(); err != nil {
    return err
}
Interfaces and Custom Types

Zeroer Interface: Custom zero-value detection

type CustomTime struct {
    time.Time
}

func (ct CustomTime) IsZero() bool {
    return ct.Time.IsZero() || ct.Year() < 1900
}

// Use in struct with defaultzero tag
type Event struct {
    ID   int        `mysql:"id"`
    Date CustomTime `mysql:"created_at,defaultzero"`
}

Valueser Interface: Custom value conversion

type Status int

const (
    StatusInactive Status = 0
    StatusActive   Status = 1
)

func (s Status) Values() []any {
    return []any{int(s)}
}

// Use in parameters or struct fields
type User struct {
    ID     int    `mysql:"id"`
    Status Status `mysql:"status"`
}
Advanced Caching

MultiCache: Stack multiple cache layers

// Combine in-memory and Redis caching
weak := mysql.NewWeakCache()
redis := mysql.NewRedisCache(redisClient)
multi := mysql.NewMultiCache(weak, redis)

db.UseCache(multi)

Cache with distributed locking:

db.EnableRedis(redisClient)
// Queries will use distributed locks to prevent cache stampedes
err := db.Select(&users, "SELECT * FROM users WHERE popular = 1", 
    5*time.Minute, // cache TTL
)
Row Types and Converters
// MapRow - convert query results to maps
var rows []mysql.MapRow
err := db.Select(&rows, "SELECT id, name, email FROM users", 0)

// SliceRow - convert to slices
var rows []mysql.SliceRow  
err := db.Select(&rows, "SELECT id, name, email FROM users", 0)

// Custom row processing
err = db.SelectRows("SELECT * FROM large_table", 0, func(rows *sql.Rows) error {
    for rows.Next() {
        // Process each row individually
        var id int
        var name string
        if err := rows.Scan(&id, &name); err != nil {
            return err
        }
        // Handle row...
    }
    return rows.Err()
})

Performance & Best Practices

Connection Pooling

cool-mysql uses separate connection pools for read and write operations:

// Reads use the read pool (optimized for read-heavy workloads)
var users []User
err := db.Select(&users, "SELECT * FROM users", cacheTTL)

// Writes use the write pool (ensures consistency)
err := db.Insert("users", newUser)

// Force use of write pool for reads (when read consistency is critical)
err := db.SelectWrites(&users, "SELECT * FROM users WHERE just_created = 1", nil)
Large Dataset Handling

Chunked inserts automatically respect MySQL's max_allowed_packet:

// Automatically chunks large slices
largeUserSlice := make([]User, 10000)
err := db.Insert("users", largeUserSlice) // Inserts in optimal chunks

// Channel-based streaming inserts
userCh := make(chan User, 100)
go func() {
    defer close(userCh)
    for _, user := range largeUserSlice {
        userCh <- user
    }
}()
err := db.Insert("users", userCh) // Processes in batches

Streaming selects for large result sets:

// Use channels for memory-efficient processing
userCh := make(chan User, 100)
go func() {
    defer close(userCh)
    err := db.Select(userCh, "SELECT * FROM users", 0)
    if err != nil {
        log.Fatal(err)
    }
}()

for user := range userCh {
    // Process each user without loading all into memory
    processUser(user)
}
Query Optimization

Effective caching strategies:

// Short TTL for frequently changing data
err := db.Select(&activeUsers, "SELECT * FROM users WHERE active = 1", 
    30*time.Second)

// Long TTL for relatively static data
err := db.Select(&countries, "SELECT * FROM countries", 
    24*time.Hour)

// No caching for real-time data
err := db.Select(&currentBalance, "SELECT balance FROM accounts WHERE id = ?", 
    0, userID) // TTL = 0 means no caching

Template optimization:

// Use templates for dynamic queries to reduce query plan cache pollution
query := `
SELECT * FROM users 
WHERE 1=1
{{ if .ActiveOnly }}AND active = 1{{ end }}
{{ if .Department }}AND department = @@Department{{ end }}
`

params := struct {
    ActiveOnly bool
    Department string
}{
    ActiveOnly: true,
    Department: "engineering",
}

err := db.Select(&users, query, cacheTTL, params)
Best Practices
  1. Use appropriate TTL values:

    • Static data: hours to days
    • Semi-static data: minutes to hours
    • Dynamic data: seconds to minutes
    • Real-time data: no caching (TTL = 0)
  2. Leverage read/write separation:

    • Use regular Select() for most reads
    • Use SelectWrites() only when read-after-write consistency is critical
  3. Handle large datasets efficiently:

    • Use channels for streaming large result sets
    • Let the library handle insert chunking automatically
    • Consider using Count() instead of SELECT COUNT(*)
  4. Optimize for your caching setup:

    • Use MultiCache to combine fast local cache with shared Redis cache
    • Configure appropriate Redis lock retry delays for your workload
    • Monitor cache hit rates and adjust TTLs accordingly

Error Handling & Reliability

cool-mysql includes comprehensive error handling and automatic retry mechanisms:

Automatic Retries

The library automatically retries operations that fail due to transient MySQL errors:

Retry-eligible MySQL error codes:

  • 1213 - Deadlock found when trying to get lock
  • 1205 - Lock wait timeout exceeded
  • 2006 - MySQL server has gone away
  • 2013 - Lost connection to MySQL server during query

Retry behavior:

  • Uses exponential backoff with jitter
  • Maximum retry attempts determined by context timeout
  • Delays start at ~20ms and increase exponentially
// Operations will automatically retry on transient errors
err := db.Select(&users, "SELECT * FROM users", 0)
// If this fails with a deadlock (1213), it will retry automatically
Custom Error Handling
// Check for specific error types
var users []User
err := db.Select(&users, "SELECT * FROM users WHERE id = ?", 0, 999)
if err == sql.ErrNoRows {
    log.Println("No users found")
} else if err != nil {
    log.Printf("Database error: %v", err)
}
Transaction Retry Pattern
func performComplexOperation(ctx context.Context, db *mysql.DB) error {
    return mysql.RetryableTransaction(ctx, db, func(tx *sql.Tx) error {
        // Your transactional operations here
        // If this returns a retryable error, the entire transaction will be retried
        _, err := tx.ExecContext(ctx, "UPDATE accounts SET balance = balance - 100 WHERE id = ?", 1)
        if err != nil {
            return err
        }
        _, err = tx.ExecContext(ctx, "UPDATE accounts SET balance = balance + 100 WHERE id = ?", 2)
        return err
    })
}

License

This project is licensed under the MIT License.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var BuiltInParams = Params{
	"MaxTime": MaxTime,
}
View Source
var ErrCacheMiss = errors.New("cache miss")

ErrCacheMiss is returned by Cache implementations when a key is not found.

View Source
var ErrDestType = fmt.Errorf("cool-mysql: select destination must be a channel or a pointer to something")
View Source
var ErrNoColumnNames = fmt.Errorf("no column names given")
View Source
var ErrNoTableName = errors.New("no table name found")
View Source
var ErrUnexportedField = fmt.Errorf("cool-mysql: struct has unexported fields and cannot be used with channels")
View Source
var MaxConnectionTime = MaxExecutionTime
View Source
var MaxExecutionTime = time.Duration(getenvInt64("COOL_MAX_EXECUTION_TIME_TIME", int64(float64(30)*.9))) * time.Second

MaxExecutionTime is the total time we would like our queries to be able to execute. Since we are using 30 second limited AWS Lambda functions, we'll default this time to 90% of 30 seconds (27 seconds), with the goal of letting our process clean up and correctly log any failed queries

View Source
var MaxTime = time.Unix((1<<31)-1, 999999999)
View Source
var QueryErrorLoggingLength = getenvInt("COOL_MYSQL_MAX_QUERY_LOG_LENGTH", 1<<12) // 4kB

QueryErrorLoggingLength is the size of the query characters that are logged when an error occurs

View Source
var RedisLockRetryDelay = time.Duration(getenvFloat("COOL_REDIS_LOCK_RETRY_DELAY", .020)) * time.Second

Functions

func Bool

func Bool(v any) bool

func Bytes

func Bytes(v any) []byte

func Float32

func Float32(v any) float32

func Float64

func Float64(v any) float64

func Int

func Int(v any) int

func Int16

func Int16(v any) int16

func Int32

func Int32(v any) int32

func Int64

func Int64(v any) int64

func Int8

func Int8(v any) int8

func Marshal

func Marshal(x any, valuerFuncs map[reflect.Type]reflect.Value) ([]byte, error)

func NewContext

func NewContext(ctx context.Context, db *Database) context.Context

NewContext returns a new context.Context with the given *Database

func NewContextWithFunc

func NewContextWithFunc(ctx context.Context, f func() *Database) context.Context

NewContextWithFunc returns a new context.Context with the given func () *Database This can be useful if you only want to initialize the database when it is actually needed. Combine with sync.OnceValue to ensure the database is only initialized once.

Example:

ctx := NewContextWithFunc(context.Background(), sync.OnceValue(func() *Database {
	db, err := NewDatabase()
	if err != nil {
		panic(err)
	}
	return db
}))

func NewContextWithTx

func NewContextWithTx(ctx context.Context, tx *Tx) context.Context

NewContextWithTx returns a new context.Context with the given *Tx

func String

func String(v any) string

func StructFieldIndexes

func StructFieldIndexes(t reflect.Type) [][]int

StructFieldIndexes recursively gets all the struct field index, including the indexes from embedded structs

func Time

func Time(v any) time.Time

func Uint

func Uint(v any) uint

func Uint16

func Uint16(v any) uint16

func Uint32

func Uint32(v any) uint32

func Uint64

func Uint64(v any) uint64

func Uint8

func Uint8(v any) uint8

func Value

func Value[T any](v any) T

Types

type Cache added in v0.0.5

type Cache interface {
	Get(ctx context.Context, key string) ([]byte, error)
	Set(ctx context.Context, key string, value []byte, ttl time.Duration) error
}

Cache defines basic get/set operations for query caching.

type Database

type Database struct {
	Writes handlerWithContext
	Reads  *sql.DB

	WritesDSN string
	ReadsDSN  string

	Log              LogFunc
	Finished         FinishedFunc
	HandleCacheError HandleCacheError

	MaxInsertSize *synct[int]

	// DisableForeignKeyChecks only affects foreign keys for transactions
	DisableForeignKeyChecks bool

	Logger                      Logger
	DisableUnusedColumnWarnings bool
	// contains filtered or unexported fields
}

Database is a cool MySQL connection

func FromContext

func FromContext(ctx context.Context) *Database

FromContext returns a *Database from a context.Context or nil if none is present.

func New

func New(wUser, wPass, wSchema, wHost string, wPort int,
	rUser, rPass, rSchema, rHost string, rPort int,
	collation string, timeZone *time.Location) (db *Database, err error)

New creates a new Database

func NewFromConn

func NewFromConn(writesConn, readsConn *sql.DB) (*Database, error)

NewFromConn creates a new Database given existing *sql.DB connections. It will query the writesConn for @@max_allowed_packet to set MaxInsertSize. If readsConn == writesConn, both Reads and Writes share the same pool.

func NewFromDSN

func NewFromDSN(writes, reads string) (db *Database, err error)

NewFromDSN creates a new Database from config DSN strings for both connections

func NewLocalWriter

func NewLocalWriter(path string) (*Database, error)

func NewWriter

func NewWriter(w io.Writer) (*Database, error)

func (*Database) AddTemplateFuncs

func (db *Database) AddTemplateFuncs(funcs template.FuncMap)

AddTemplateFuncs adds template functions to the database

func (*Database) AddValuerFuncs

func (db *Database) AddValuerFuncs(funcs ...any)

func (*Database) BeginReadsTx

func (db *Database) BeginReadsTx() (tx *Tx, cancel func() error, err error)

BeginReadsTx begins and returns a new transaction on the writes connection

func (*Database) BeginReadsTxContext

func (db *Database) BeginReadsTxContext(ctx context.Context) (tx *Tx, cancel func() error, err error)

BeginReadsTxContext begins and returns a new transaction on the reads connection

func (*Database) BeginTx

func (db *Database) BeginTx() (tx *Tx, cancel func() error, err error)

BeginTx begins and returns a new transaction on the writes connection

func (*Database) BeginTxContext

func (db *Database) BeginTxContext(ctx context.Context) (tx *Tx, cancel func() error, err error)

BeginTxContext begins and returns a new transaction on the writes connection

func (*Database) Clone

func (db *Database) Clone() *Database

Clone returns a copy of the db with the same connections but with an empty query log

func (*Database) Count

func (db *Database) Count(query string, cache time.Duration, params ...any) (int, error)

Count efficiently checks the number of rows a query returns

func (*Database) DefaultInsertOptions

func (db *Database) DefaultInsertOptions() *Inserter

func (*Database) Die

func (db *Database) Die()

Die will dump the next query and then exit

func (*Database) EnableMemcache added in v0.0.5

func (db *Database) EnableMemcache(mc *memcache.Client) *Database

EnableMemcache configures memcached as the cache backend.

func (*Database) EnableRedis

func (db *Database) EnableRedis(redisClient redis.UniversalClient) *Database

EnableRedis enables redis cache for select queries with cache times with the given connection information

func (*Database) Exec

func (db *Database) Exec(query string, params ...any) error

Exec executes a query and nothing more

func (*Database) ExecContext

func (db *Database) ExecContext(ctx context.Context, query string, params ...any) error

ExecContext executes a query and nothing more

func (*Database) ExecContextResult

func (db *Database) ExecContextResult(ctx context.Context, query string, params ...any) (sql.Result, error)

ExecContext executes a query and nothing more

func (*Database) ExecResult

func (db *Database) ExecResult(query string, params ...any) (sql.Result, error)

ExecResult executes a query and nothing more

func (*Database) Exists

func (db *Database) Exists(query string, cache time.Duration, params ...any) (bool, error)

Exists efficiently checks if there are any rows in the given query using the `Reads` connection

func (*Database) ExistsContext

func (db *Database) ExistsContext(ctx context.Context, query string, cache time.Duration, params ...any) (bool, error)

ExistsContext efficiently checks if there are any rows in the given query using the `Reads` connection

func (*Database) ExistsWrites

func (db *Database) ExistsWrites(query string, cache time.Duration, params ...any) (bool, error)

ExistsWrites efficiently checks if there are any rows in the given query using the `Writes` connection

func (*Database) ExistsWritesContext

func (db *Database) ExistsWritesContext(ctx context.Context, query string, cache time.Duration, params ...any) (bool, error)

ExistsWritesContext efficiently checks if there are any rows in the given query using the `Writes` connection

func (*Database) I

func (db *Database) I() *Inserter

func (*Database) Insert

func (db *Database) Insert(insert string, source any) error

func (*Database) InsertContext

func (db *Database) InsertContext(ctx context.Context, insert string, source any) error

func (*Database) InsertReads

func (db *Database) InsertReads(insert string, source any) error

func (*Database) InsertReadsContext

func (db *Database) InsertReadsContext(ctx context.Context, insert string, source any) error

func (*Database) InterpolateParams

func (db *Database) InterpolateParams(query string, params ...any) (replacedQuery string, normalizedParams Params, err error)

func (*Database) Reconnect

func (db *Database) Reconnect() error

Reconnect creates new connection(s) for writes and reads and replaces the existing connections with the new ones

func (*Database) Select

func (db *Database) Select(dest any, q string, cache time.Duration, params ...any) error

func (*Database) SelectContext

func (db *Database) SelectContext(ctx context.Context, dest any, q string, cache time.Duration, params ...any) error

func (*Database) SelectJSON

func (db *Database) SelectJSON(dest any, query string, cache time.Duration, params ...any) error

func (*Database) SelectJSONContext

func (db *Database) SelectJSONContext(ctx context.Context, dest any, query string, cache time.Duration, params ...any) error

func (*Database) SelectRows

func (db *Database) SelectRows(q string, cache time.Duration, params ...any) (Rows, error)

func (*Database) SelectWrites

func (db *Database) SelectWrites(dest any, q string, cache time.Duration, params ...any) error

func (*Database) SelectWritesContext

func (db *Database) SelectWritesContext(ctx context.Context, dest any, q string, cache time.Duration, params ...any) error

func (*Database) Test

func (db *Database) Test() error

Test pings both writes and reads connection and if either fail reconnects both connections

func (*Database) Upsert

func (db *Database) Upsert(insert string, uniqueColumns, updateColumns []string, where string, source any) error

func (*Database) UpsertContext

func (db *Database) UpsertContext(ctx context.Context, insert string, uniqueColumns, updateColumns []string, where string, source any) error

func (*Database) UseCache added in v0.0.5

func (db *Database) UseCache(c Cache) *Database

UseCache sets a custom cache implementation.

func (*Database) WriterWithSubdir

func (db *Database) WriterWithSubdir(subdir string) *Database

type Error

type Error struct {
	Err error

	OriginalQuery string
	ReplacedQuery string
	Params        any
}

Error contains the error and query details

func Wrap

func Wrap(err error, originalQuery, replaceQuery string, params any) Error

func (Error) Error

func (v Error) Error() string

func (Error) Unwrap

func (v Error) Unwrap() error

type FinishedFunc

type FinishedFunc func(cached bool, replacedQuery string, params Params, execDuration time.Duration, fetchDuration time.Duration)

FinishedFunc executes after all rows have been processed, including being read from the channel if used

type HandleCacheError added in v0.0.5

type HandleCacheError func(err error) error

HandleCacheError is executed on a cache error so it can be handled by the user. Returning a non-nil error will abort execution.

type HandleRedisError

type HandleRedisError = HandleCacheError

HandleRedisError is kept for backwards compatibility.

type Handler

type Handler interface {
	Insert(insert string, source any) error
	InsertContext(ctx context.Context, insert string, source any) error

	ExecContextResult(ctx context.Context, query string, params ...any) (sql.Result, error)
	ExecContext(ctx context.Context, query string, params ...any) error
	ExecResult(query string, params ...any) (sql.Result, error)
	Exec(query string, params ...any) error

	Select(dest any, q string, cache time.Duration, params ...any) error
	SelectRows(q string, cache time.Duration, params ...any) (Rows, error)
	SelectContext(ctx context.Context, dest any, q string, cache time.Duration, params ...any) error
	SelectJSON(dest any, query string, cache time.Duration, params ...any) error
	SelectJSONContext(ctx context.Context, dest any, query string, cache time.Duration, params ...any) error

	Exists(query string, cache time.Duration, params ...any) (bool, error)
	ExistsContext(ctx context.Context, query string, cache time.Duration, params ...any) (bool, error)
	Upsert(insert string, uniqueColumns, updateColumns []string, where string, source any) error
	UpsertContext(ctx context.Context, insert string, uniqueColumns, updateColumns []string, where string, source any) error
}

func TxOrDatabaseFromContext

func TxOrDatabaseFromContext(ctx context.Context) Handler

type Inserter

type Inserter struct {
	AfterChunkExec func(start time.Time)
	AfterRowExec   func(start time.Time)
	HandleResult   func(sql.Result)
	// contains filtered or unexported fields
}

func (*Inserter) Insert

func (in *Inserter) Insert(insert string, source any) error

func (*Inserter) InsertContext

func (in *Inserter) InsertContext(ctx context.Context, insert string, source any) error

func (*Inserter) SetAfterChunkExec

func (in *Inserter) SetAfterChunkExec(fn func(start time.Time)) *Inserter

func (*Inserter) SetAfterRowExec

func (in *Inserter) SetAfterRowExec(fn func(start time.Time)) *Inserter

func (*Inserter) SetExecutor

func (in *Inserter) SetExecutor(conn handlerWithContext) *Inserter

func (*Inserter) SetResultHandler

func (in *Inserter) SetResultHandler(fn func(sql.Result)) *Inserter

func (*Inserter) Upsert

func (in *Inserter) Upsert(query string, uniqueColumns, updateColumns []string, where string, source any) error

func (*Inserter) UpsertContext

func (in *Inserter) UpsertContext(ctx context.Context, query string, uniqueColumns, updateColumns []string, where string, source any) error

type Locker added in v0.0.5

type Locker interface {
	Lock(ctx context.Context, key string) (func() error, error)
}

Locker provides optional distributed locking for cache population.

type LogDetail

type LogDetail struct {
	Query        string
	Params       Params
	Duration     time.Duration
	CacheHit     bool
	Tx           *sql.Tx
	RowsAffected int64
	Attempt      int
	Error        error
}

type LogFunc

type LogFunc func(detail LogDetail)

LogFunc is called after the query executes

type Logger added in v0.0.5

type Logger interface {
	Debug(msg string, args ...any)
	Info(msg string, args ...any)
	Warn(msg string, args ...any)
	Error(msg string, args ...any)
}

Logger defines the minimal logging interface used by this package.

func DefaultLogger added in v0.0.5

func DefaultLogger() Logger

DefaultLogger returns a slog-based logger used when none is provided.

type MapRow

type MapRow map[string]any

type MapRows

type MapRows []MapRow

type MemcacheCache added in v0.0.5

type MemcacheCache struct {
	Client *memcache.Client
}

MemcacheCache implements Cache using a memcached client.

func NewMemcacheCache added in v0.0.5

func NewMemcacheCache(client *memcache.Client) *MemcacheCache

func (*MemcacheCache) Get added in v0.0.5

func (m *MemcacheCache) Get(ctx context.Context, key string) ([]byte, error)

func (*MemcacheCache) Set added in v0.0.5

func (m *MemcacheCache) Set(ctx context.Context, key string, val []byte, ttl time.Duration) error

type MultiCache added in v0.0.5

type MultiCache struct {
	// contains filtered or unexported fields
}

MultiCache composes multiple caches. Reads check each cache in order and populate earlier caches on a hit. Writes fan out to all caches.

func NewMultiCache added in v0.0.5

func NewMultiCache(caches ...Cache) *MultiCache

NewMultiCache creates a MultiCache from the provided caches.

func (*MultiCache) Get added in v0.0.5

func (m *MultiCache) Get(ctx context.Context, key string) ([]byte, error)

func (*MultiCache) Set added in v0.0.5

func (m *MultiCache) Set(ctx context.Context, key string, val []byte, ttl time.Duration) error

type Params

type Params map[string]any

Params are a map of parameter names to values use in the query like `select @@Name`

func InterpolateParams

func InterpolateParams(query string, tmplFuncs template.FuncMap, valuerFuncs map[reflect.Type]reflect.Value, params ...any) (replacedQuery string, normalizedParams Params, err error)

InterpolateParams replaces the `@@` parameters in a query with their values from the map(s) Takes multiple "sets" of params for convenience, so we don't have to specify params if there aren't any, but each param will override the values of the previous. If there are 2 maps given, both with the key "ID", the last one will be used

type Raw

type Raw string

Raw is a literal MySQL string, not to be encoded or escaped in any way

type RedisCache added in v0.0.5

type RedisCache struct {
	Client redis.UniversalClient
	// contains filtered or unexported fields
}

RedisCache implements Cache and Locker using go-redis and redsync.

func NewRedisCache added in v0.0.5

func NewRedisCache(client redis.UniversalClient) *RedisCache

NewRedisCache creates a RedisCache from a universal client.

func (*RedisCache) Get added in v0.0.5

func (r *RedisCache) Get(ctx context.Context, key string) ([]byte, error)

func (*RedisCache) Lock added in v0.0.5

func (r *RedisCache) Lock(ctx context.Context, key string) (func() error, error)

func (*RedisCache) Set added in v0.0.5

func (r *RedisCache) Set(ctx context.Context, key string, val []byte, ttl time.Duration) error

type Row

type Row = MapRow

type Rows

type Rows = MapRows

type SliceRow

type SliceRow []any

type SliceRows

type SliceRows []SliceRow

type SlogLogger added in v0.0.5

type SlogLogger struct{ *slog.Logger }

SlogLogger adapts slog.Logger to the Logger interface.

func (SlogLogger) Debug added in v0.0.5

func (l SlogLogger) Debug(msg string, args ...any)

func (SlogLogger) Error added in v0.0.5

func (l SlogLogger) Error(msg string, args ...any)

func (SlogLogger) Info added in v0.0.5

func (l SlogLogger) Info(msg string, args ...any)

func (SlogLogger) Warn added in v0.0.5

func (l SlogLogger) Warn(msg string, args ...any)

type Tx

type Tx struct {
	Tx   *sql.Tx
	Time time.Time

	PostCommitHooks []func() error
	// contains filtered or unexported fields
}

Tx is a cool MySQL transaction

func GetOrCreateTxFromContext

func GetOrCreateTxFromContext(ctx context.Context) (tx *Tx, commit, cancel func() error, err error)

GetOrCreateTxFromContext returns a *Tx from a context.Context or creates a new one if none is present. It also returns a `commit` func and `cancel` func. Both funcs will be noop if the tx is not created in this function. `cancel` should be deferred directly after calling this function to ensure the tx is rolled back if an error occurs.

Example:

tx, commit, cancel, err := GetOrCreateTxFromContext(ctx)
defer cancel()
if err != nil {
    return fmt.Errorf("failed to get or create tx: %w", err)
}
ctx = NewContextWithTx(ctx, tx) // if you want to pass tx to other functions

// do something with tx

if err := commit(); err != nil {
    return fmt.Errorf("failed to commit tx: %w", err)
}

func TxFromContext

func TxFromContext(ctx context.Context) *Tx

TxFromContext returns a *Tx from a context.Context or nil if none is present.

func (*Tx) Cancel

func (tx *Tx) Cancel() error

Cancel the transaction this should be deferred after creating new tx every time

func (*Tx) Commit

func (tx *Tx) Commit() error

Commit commits the transaction

func (*Tx) DefaultInsertOptions

func (tx *Tx) DefaultInsertOptions() *Inserter

func (*Tx) Exec

func (tx *Tx) Exec(query string, params ...any) error

Exec executes a query and nothing more

func (*Tx) ExecContext

func (tx *Tx) ExecContext(ctx context.Context, query string, params ...any) error

ExecContext executes a query and nothing more

func (*Tx) ExecContextResult

func (tx *Tx) ExecContextResult(ctx context.Context, query string, params ...any) (sql.Result, error)

ExecContextResult executes a query and nothing more

func (*Tx) ExecResult

func (tx *Tx) ExecResult(query string, params ...any) (sql.Result, error)

ExecResult executes a query and nothing more

func (*Tx) Exists

func (tx *Tx) Exists(query string, cache time.Duration, params ...any) (bool, error)

Exists efficiently checks if there are any rows in the given query using the `Reads` connection

func (*Tx) ExistsContext

func (tx *Tx) ExistsContext(ctx context.Context, query string, cache time.Duration, params ...any) (bool, error)

ExistsContext efficiently checks if there are any rows in the given query using the `Reads` connection

func (*Tx) I

func (tx *Tx) I() *Inserter

func (*Tx) Insert

func (tx *Tx) Insert(insert string, source any) error

func (*Tx) InsertContext

func (tx *Tx) InsertContext(ctx context.Context, insert string, source any) error

func (*Tx) Select

func (tx *Tx) Select(dest any, q string, cache time.Duration, params ...any) error

func (*Tx) SelectContext

func (tx *Tx) SelectContext(ctx context.Context, dest any, q string, cache time.Duration, params ...any) error

func (*Tx) SelectJSON

func (tx *Tx) SelectJSON(dest any, query string, cache time.Duration, params ...any) error

func (*Tx) SelectJSONContext

func (tx *Tx) SelectJSONContext(ctx context.Context, dest any, query string, cache time.Duration, params ...any) error

func (*Tx) SelectRows

func (tx *Tx) SelectRows(q string, cache time.Duration, params ...any) (Rows, error)

func (*Tx) Upsert

func (tx *Tx) Upsert(insert string, uniqueColumns, updateColumns []string, where string, source any) error

func (*Tx) UpsertContext

func (tx *Tx) UpsertContext(ctx context.Context, insert string, uniqueColumns, updateColumns []string, where string, source any) error

type Valueser

type Valueser interface {
	MySQLValues() ([]driver.Value, error)
}

type WeakCache added in v0.0.5

type WeakCache struct {
	// contains filtered or unexported fields
}

WeakCache stores values in memory using weak pointers so the garbage collector may reclaim them under pressure.

func NewWeakCache added in v0.0.5

func NewWeakCache() *WeakCache

func (*WeakCache) Get added in v0.0.5

func (w *WeakCache) Get(ctx context.Context, key string) ([]byte, error)

func (*WeakCache) Set added in v0.0.5

func (w *WeakCache) Set(ctx context.Context, key string, val []byte, ttl time.Duration) error

type ZapLogger added in v0.0.5

type ZapLogger struct{ *zap.Logger }

ZapLogger adapts zap.Logger to the Logger interface.

func (ZapLogger) Debug added in v0.0.5

func (l ZapLogger) Debug(msg string, args ...any)

func (ZapLogger) Error added in v0.0.5

func (l ZapLogger) Error(msg string, args ...any)

func (ZapLogger) Info added in v0.0.5

func (l ZapLogger) Info(msg string, args ...any)

func (ZapLogger) Warn added in v0.0.5

func (l ZapLogger) Warn(msg string, args ...any)

type Zeroer

type Zeroer interface {
	IsZero() bool
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL