Documentation
¶
Index ¶
- Variables
- func Bool(v any) bool
- func Bytes(v any) []byte
- func Float32(v any) float32
- func Float64(v any) float64
- func Int(v any) int
- func Int16(v any) int16
- func Int32(v any) int32
- func Int64(v any) int64
- func Int8(v any) int8
- func Marshal(x any, valuerFuncs map[reflect.Type]reflect.Value) ([]byte, error)
- func NewContext(ctx context.Context, db *Database) context.Context
- func NewContextWithFunc(ctx context.Context, f func() *Database) context.Context
- func NewContextWithTx(ctx context.Context, tx *Tx) context.Context
- func String(v any) string
- func StructFieldIndexes(t reflect.Type) [][]int
- func Time(v any) time.Time
- func Uint(v any) uint
- func Uint16(v any) uint16
- func Uint32(v any) uint32
- func Uint64(v any) uint64
- func Uint8(v any) uint8
- func Value[T any](v any) T
- type Cache
- type Database
- func FromContext(ctx context.Context) *Database
- func New(wUser, wPass, wSchema, wHost string, wPort int, ...) (db *Database, err error)
- func NewFromConn(writesConn, readsConn *sql.DB) (*Database, error)
- func NewFromDSN(writes, reads string) (db *Database, err error)
- func NewLocalWriter(path string) (*Database, error)
- func NewWriter(w io.Writer) (*Database, error)
- func (db *Database) AddTemplateFuncs(funcs template.FuncMap)
- func (db *Database) AddValuerFuncs(funcs ...any)
- func (db *Database) BeginReadsTx() (tx *Tx, cancel func() error, err error)
- func (db *Database) BeginReadsTxContext(ctx context.Context) (tx *Tx, cancel func() error, err error)
- func (db *Database) BeginTx() (tx *Tx, cancel func() error, err error)
- func (db *Database) BeginTxContext(ctx context.Context) (tx *Tx, cancel func() error, err error)
- func (db *Database) Clone() *Database
- func (db *Database) Count(query string, cache time.Duration, params ...any) (int, error)
- func (db *Database) DefaultInsertOptions() *Inserter
- func (db *Database) Die()
- func (db *Database) EnableMemcache(mc *memcache.Client) *Database
- func (db *Database) EnableRedis(redisClient redis.UniversalClient) *Database
- func (db *Database) Exec(query string, params ...any) error
- func (db *Database) ExecContext(ctx context.Context, query string, params ...any) error
- func (db *Database) ExecContextResult(ctx context.Context, query string, params ...any) (sql.Result, error)
- func (db *Database) ExecResult(query string, params ...any) (sql.Result, error)
- func (db *Database) Exists(query string, cache time.Duration, params ...any) (bool, error)
- func (db *Database) ExistsContext(ctx context.Context, query string, cache time.Duration, params ...any) (bool, error)
- func (db *Database) ExistsWrites(query string, cache time.Duration, params ...any) (bool, error)
- func (db *Database) ExistsWritesContext(ctx context.Context, query string, cache time.Duration, params ...any) (bool, error)
- func (db *Database) I() *Inserter
- func (db *Database) Insert(insert string, source any) error
- func (db *Database) InsertContext(ctx context.Context, insert string, source any) error
- func (db *Database) InsertReads(insert string, source any) error
- func (db *Database) InsertReadsContext(ctx context.Context, insert string, source any) error
- func (db *Database) InterpolateParams(query string, params ...any) (replacedQuery string, normalizedParams Params, err error)
- func (db *Database) Reconnect() error
- func (db *Database) Select(dest any, q string, cache time.Duration, params ...any) error
- func (db *Database) SelectContext(ctx context.Context, dest any, q string, cache time.Duration, params ...any) error
- func (db *Database) SelectJSON(dest any, query string, cache time.Duration, params ...any) error
- func (db *Database) SelectJSONContext(ctx context.Context, dest any, query string, cache time.Duration, ...) error
- func (db *Database) SelectRows(q string, cache time.Duration, params ...any) (Rows, error)
- func (db *Database) SelectWrites(dest any, q string, cache time.Duration, params ...any) error
- func (db *Database) SelectWritesContext(ctx context.Context, dest any, q string, cache time.Duration, params ...any) error
- func (db *Database) Test() error
- func (db *Database) Upsert(insert string, uniqueColumns, updateColumns []string, where string, source any) error
- func (db *Database) UpsertContext(ctx context.Context, insert string, uniqueColumns, updateColumns []string, ...) error
- func (db *Database) UseCache(c Cache) *Database
- func (db *Database) WriterWithSubdir(subdir string) *Database
- type Error
- type FinishedFunc
- type HandleCacheError
- type HandleRedisError
- type Handler
- type Inserter
- func (in *Inserter) Insert(insert string, source any) error
- func (in *Inserter) InsertContext(ctx context.Context, insert string, source any) error
- func (in *Inserter) SetAfterChunkExec(fn func(start time.Time)) *Inserter
- func (in *Inserter) SetAfterRowExec(fn func(start time.Time)) *Inserter
- func (in *Inserter) SetExecutor(conn handlerWithContext) *Inserter
- func (in *Inserter) SetResultHandler(fn func(sql.Result)) *Inserter
- func (in *Inserter) Upsert(query string, uniqueColumns, updateColumns []string, where string, source any) error
- func (in *Inserter) UpsertContext(ctx context.Context, query string, uniqueColumns, updateColumns []string, ...) error
- type Locker
- type LogDetail
- type LogFunc
- type Logger
- type MapRow
- type MapRows
- type MemcacheCache
- type MultiCache
- type Params
- type Raw
- type RedisCache
- type Row
- type Rows
- type SliceRow
- type SliceRows
- type SlogLogger
- type Tx
- func (tx *Tx) Cancel() error
- func (tx *Tx) Commit() error
- func (tx *Tx) DefaultInsertOptions() *Inserter
- func (tx *Tx) Exec(query string, params ...any) error
- func (tx *Tx) ExecContext(ctx context.Context, query string, params ...any) error
- func (tx *Tx) ExecContextResult(ctx context.Context, query string, params ...any) (sql.Result, error)
- func (tx *Tx) ExecResult(query string, params ...any) (sql.Result, error)
- func (tx *Tx) Exists(query string, cache time.Duration, params ...any) (bool, error)
- func (tx *Tx) ExistsContext(ctx context.Context, query string, cache time.Duration, params ...any) (bool, error)
- func (tx *Tx) I() *Inserter
- func (tx *Tx) Insert(insert string, source any) error
- func (tx *Tx) InsertContext(ctx context.Context, insert string, source any) error
- func (tx *Tx) Select(dest any, q string, cache time.Duration, params ...any) error
- func (tx *Tx) SelectContext(ctx context.Context, dest any, q string, cache time.Duration, params ...any) error
- func (tx *Tx) SelectJSON(dest any, query string, cache time.Duration, params ...any) error
- func (tx *Tx) SelectJSONContext(ctx context.Context, dest any, query string, cache time.Duration, ...) error
- func (tx *Tx) SelectRows(q string, cache time.Duration, params ...any) (Rows, error)
- func (tx *Tx) Upsert(insert string, uniqueColumns, updateColumns []string, where string, source any) error
- func (tx *Tx) UpsertContext(ctx context.Context, insert string, uniqueColumns, updateColumns []string, ...) error
- type Valueser
- type WeakCache
- type ZapLogger
- type Zeroer
Constants ¶
This section is empty.
Variables ¶
var BuiltInParams = Params{ "MaxTime": MaxTime, }
var ErrCacheMiss = errors.New("cache miss")
ErrCacheMiss is returned by Cache implementations when a key is not found.
var ErrDestType = fmt.Errorf("cool-mysql: select destination must be a channel or a pointer to something")
var ErrNoColumnNames = fmt.Errorf("no column names given")
var ErrNoTableName = errors.New("no table name found")
var ErrUnexportedField = fmt.Errorf("cool-mysql: struct has unexported fields and cannot be used with channels")
var MaxConnectionTime = MaxExecutionTime
var MaxExecutionTime = time.Duration(getenvInt64("COOL_MAX_EXECUTION_TIME_TIME", int64(float64(30)*.9))) * time.Second
MaxExecutionTime is the total time we would like our queries to be able to execute. Since we are using 30 second limited AWS Lambda functions, we'll default this time to 90% of 30 seconds (27 seconds), with the goal of letting our process clean up and correctly log any failed queries
var MaxTime = time.Unix((1<<31)-1, 999999999)
var QueryErrorLoggingLength = getenvInt("COOL_MYSQL_MAX_QUERY_LOG_LENGTH", 1<<12) // 4kB
QueryErrorLoggingLength is the size of the query characters that are logged when an error occurs
var RedisLockRetryDelay = time.Duration(getenvFloat("COOL_REDIS_LOCK_RETRY_DELAY", .020)) * time.Second
Functions ¶
func NewContext ¶
NewContext returns a new context.Context with the given *Database
func NewContextWithFunc ¶
NewContextWithFunc returns a new context.Context with the given func () *Database This can be useful if you only want to initialize the database when it is actually needed. Combine with sync.OnceValue to ensure the database is only initialized once.
Example:
ctx := NewContextWithFunc(context.Background(), sync.OnceValue(func() *Database { db, err := NewDatabase() if err != nil { panic(err) } return db }))
func NewContextWithTx ¶
NewContextWithTx returns a new context.Context with the given *Tx
func StructFieldIndexes ¶
StructFieldIndexes recursively gets all the struct field index, including the indexes from embedded structs
Types ¶
type Cache ¶ added in v0.0.5
type Cache interface { Get(ctx context.Context, key string) ([]byte, error) Set(ctx context.Context, key string, value []byte, ttl time.Duration) error }
Cache defines basic get/set operations for query caching.
type Database ¶
type Database struct { Writes handlerWithContext Reads *sql.DB WritesDSN string ReadsDSN string Log LogFunc Finished FinishedFunc HandleCacheError HandleCacheError MaxInsertSize *synct[int] // DisableForeignKeyChecks only affects foreign keys for transactions DisableForeignKeyChecks bool Logger Logger DisableUnusedColumnWarnings bool // contains filtered or unexported fields }
Database is a cool MySQL connection
func FromContext ¶
FromContext returns a *Database from a context.Context or nil if none is present.
func New ¶
func New(wUser, wPass, wSchema, wHost string, wPort int, rUser, rPass, rSchema, rHost string, rPort int, collation string, timeZone *time.Location) (db *Database, err error)
New creates a new Database
func NewFromConn ¶
NewFromConn creates a new Database given existing *sql.DB connections. It will query the writesConn for @@max_allowed_packet to set MaxInsertSize. If readsConn == writesConn, both Reads and Writes share the same pool.
func NewFromDSN ¶
NewFromDSN creates a new Database from config DSN strings for both connections
func NewLocalWriter ¶
func (*Database) AddTemplateFuncs ¶
AddTemplateFuncs adds template functions to the database
func (*Database) AddValuerFuncs ¶
func (*Database) BeginReadsTx ¶
BeginReadsTx begins and returns a new transaction on the writes connection
func (*Database) BeginReadsTxContext ¶
func (db *Database) BeginReadsTxContext(ctx context.Context) (tx *Tx, cancel func() error, err error)
BeginReadsTxContext begins and returns a new transaction on the reads connection
func (*Database) BeginTxContext ¶
BeginTxContext begins and returns a new transaction on the writes connection
func (*Database) Clone ¶
Clone returns a copy of the db with the same connections but with an empty query log
func (*Database) DefaultInsertOptions ¶
func (*Database) EnableMemcache ¶ added in v0.0.5
EnableMemcache configures memcached as the cache backend.
func (*Database) EnableRedis ¶
EnableRedis enables redis cache for select queries with cache times with the given connection information
func (*Database) ExecContext ¶
ExecContext executes a query and nothing more
func (*Database) ExecContextResult ¶
func (db *Database) ExecContextResult(ctx context.Context, query string, params ...any) (sql.Result, error)
ExecContext executes a query and nothing more
func (*Database) ExecResult ¶
ExecResult executes a query and nothing more
func (*Database) Exists ¶
Exists efficiently checks if there are any rows in the given query using the `Reads` connection
func (*Database) ExistsContext ¶
func (db *Database) ExistsContext(ctx context.Context, query string, cache time.Duration, params ...any) (bool, error)
ExistsContext efficiently checks if there are any rows in the given query using the `Reads` connection
func (*Database) ExistsWrites ¶
ExistsWrites efficiently checks if there are any rows in the given query using the `Writes` connection
func (*Database) ExistsWritesContext ¶
func (db *Database) ExistsWritesContext(ctx context.Context, query string, cache time.Duration, params ...any) (bool, error)
ExistsWritesContext efficiently checks if there are any rows in the given query using the `Writes` connection
func (*Database) InsertContext ¶
func (*Database) InsertReadsContext ¶
func (*Database) InterpolateParams ¶
func (*Database) Reconnect ¶
Reconnect creates new connection(s) for writes and reads and replaces the existing connections with the new ones
func (*Database) SelectContext ¶
func (*Database) SelectJSON ¶
func (*Database) SelectJSONContext ¶
func (*Database) SelectRows ¶
func (*Database) SelectWrites ¶
func (*Database) SelectWritesContext ¶
func (*Database) Test ¶
Test pings both writes and reads connection and if either fail reconnects both connections
func (*Database) UpsertContext ¶
func (*Database) WriterWithSubdir ¶
type FinishedFunc ¶
type FinishedFunc func(cached bool, replacedQuery string, params Params, execDuration time.Duration, fetchDuration time.Duration)
FinishedFunc executes after all rows have been processed, including being read from the channel if used
type HandleCacheError ¶ added in v0.0.5
HandleCacheError is executed on a cache error so it can be handled by the user. Returning a non-nil error will abort execution.
type HandleRedisError ¶
type HandleRedisError = HandleCacheError
HandleRedisError is kept for backwards compatibility.
type Handler ¶
type Handler interface { Insert(insert string, source any) error InsertContext(ctx context.Context, insert string, source any) error ExecContextResult(ctx context.Context, query string, params ...any) (sql.Result, error) ExecContext(ctx context.Context, query string, params ...any) error ExecResult(query string, params ...any) (sql.Result, error) Exec(query string, params ...any) error Select(dest any, q string, cache time.Duration, params ...any) error SelectRows(q string, cache time.Duration, params ...any) (Rows, error) SelectContext(ctx context.Context, dest any, q string, cache time.Duration, params ...any) error SelectJSON(dest any, query string, cache time.Duration, params ...any) error SelectJSONContext(ctx context.Context, dest any, query string, cache time.Duration, params ...any) error Exists(query string, cache time.Duration, params ...any) (bool, error) ExistsContext(ctx context.Context, query string, cache time.Duration, params ...any) (bool, error) Upsert(insert string, uniqueColumns, updateColumns []string, where string, source any) error UpsertContext(ctx context.Context, insert string, uniqueColumns, updateColumns []string, where string, source any) error }
func TxOrDatabaseFromContext ¶
type Inserter ¶
type Inserter struct { AfterChunkExec func(start time.Time) AfterRowExec func(start time.Time) HandleResult func(sql.Result) // contains filtered or unexported fields }
func (*Inserter) InsertContext ¶
func (*Inserter) SetAfterChunkExec ¶
func (*Inserter) SetAfterRowExec ¶
func (*Inserter) SetExecutor ¶
func (*Inserter) SetResultHandler ¶
type Logger ¶ added in v0.0.5
type Logger interface { Debug(msg string, args ...any) Info(msg string, args ...any) Warn(msg string, args ...any) Error(msg string, args ...any) }
Logger defines the minimal logging interface used by this package.
func DefaultLogger ¶ added in v0.0.5
func DefaultLogger() Logger
DefaultLogger returns a slog-based logger used when none is provided.
type MemcacheCache ¶ added in v0.0.5
MemcacheCache implements Cache using a memcached client.
func NewMemcacheCache ¶ added in v0.0.5
func NewMemcacheCache(client *memcache.Client) *MemcacheCache
type MultiCache ¶ added in v0.0.5
type MultiCache struct {
// contains filtered or unexported fields
}
MultiCache composes multiple caches. Reads check each cache in order and populate earlier caches on a hit. Writes fan out to all caches.
func NewMultiCache ¶ added in v0.0.5
func NewMultiCache(caches ...Cache) *MultiCache
NewMultiCache creates a MultiCache from the provided caches.
type Params ¶
Params are a map of parameter names to values use in the query like `select @@Name`
func InterpolateParams ¶
func InterpolateParams(query string, tmplFuncs template.FuncMap, valuerFuncs map[reflect.Type]reflect.Value, params ...any) (replacedQuery string, normalizedParams Params, err error)
InterpolateParams replaces the `@@` parameters in a query with their values from the map(s) Takes multiple "sets" of params for convenience, so we don't have to specify params if there aren't any, but each param will override the values of the previous. If there are 2 maps given, both with the key "ID", the last one will be used
type RedisCache ¶ added in v0.0.5
type RedisCache struct { Client redis.UniversalClient // contains filtered or unexported fields }
RedisCache implements Cache and Locker using go-redis and redsync.
func NewRedisCache ¶ added in v0.0.5
func NewRedisCache(client redis.UniversalClient) *RedisCache
NewRedisCache creates a RedisCache from a universal client.
type SlogLogger ¶ added in v0.0.5
SlogLogger adapts slog.Logger to the Logger interface.
func (SlogLogger) Debug ¶ added in v0.0.5
func (l SlogLogger) Debug(msg string, args ...any)
func (SlogLogger) Error ¶ added in v0.0.5
func (l SlogLogger) Error(msg string, args ...any)
func (SlogLogger) Info ¶ added in v0.0.5
func (l SlogLogger) Info(msg string, args ...any)
func (SlogLogger) Warn ¶ added in v0.0.5
func (l SlogLogger) Warn(msg string, args ...any)
type Tx ¶
type Tx struct { Tx *sql.Tx Time time.Time PostCommitHooks []func() error // contains filtered or unexported fields }
Tx is a cool MySQL transaction
func GetOrCreateTxFromContext ¶
GetOrCreateTxFromContext returns a *Tx from a context.Context or creates a new one if none is present. It also returns a `commit` func and `cancel` func. Both funcs will be noop if the tx is not created in this function. `cancel` should be deferred directly after calling this function to ensure the tx is rolled back if an error occurs.
Example:
tx, commit, cancel, err := GetOrCreateTxFromContext(ctx) defer cancel() if err != nil { return fmt.Errorf("failed to get or create tx: %w", err) } ctx = NewContextWithTx(ctx, tx) // if you want to pass tx to other functions // do something with tx if err := commit(); err != nil { return fmt.Errorf("failed to commit tx: %w", err) }
func TxFromContext ¶
TxFromContext returns a *Tx from a context.Context or nil if none is present.
func (*Tx) DefaultInsertOptions ¶
func (*Tx) ExecContext ¶
ExecContext executes a query and nothing more
func (*Tx) ExecContextResult ¶
func (tx *Tx) ExecContextResult(ctx context.Context, query string, params ...any) (sql.Result, error)
ExecContextResult executes a query and nothing more
func (*Tx) ExecResult ¶
ExecResult executes a query and nothing more
func (*Tx) Exists ¶
Exists efficiently checks if there are any rows in the given query using the `Reads` connection
func (*Tx) ExistsContext ¶
func (tx *Tx) ExistsContext(ctx context.Context, query string, cache time.Duration, params ...any) (bool, error)
ExistsContext efficiently checks if there are any rows in the given query using the `Reads` connection
func (*Tx) InsertContext ¶
func (*Tx) SelectContext ¶
func (*Tx) SelectJSON ¶
func (*Tx) SelectJSONContext ¶
func (*Tx) SelectRows ¶
type WeakCache ¶ added in v0.0.5
type WeakCache struct {
// contains filtered or unexported fields
}
WeakCache stores values in memory using weak pointers so the garbage collector may reclaim them under pressure.
func NewWeakCache ¶ added in v0.0.5
func NewWeakCache() *WeakCache