Documentation
¶
Index ¶
- type BatchJob
- type Driver
- type JobOptions
- type Notification
- type PgxDriver
- func (d *PgxDriver) AddJobWithTx(ctx context.Context, tx interface{}) (Transaction, error)
- func (d *PgxDriver) AddJobsWithTx(ctx context.Context, tx interface{}, jobs []BatchJob) error
- func (d *PgxDriver) Exec(ctx context.Context, sql string, args ...interface{}) error
- func (d *PgxDriver) Listen(ctx context.Context, channel string) error
- func (d *PgxDriver) Notify(ctx context.Context, channel string, payload string) error
- func (d *PgxDriver) Query(ctx context.Context, sql string, args ...interface{}) (Rows, error)
- func (d *PgxDriver) QueryRow(ctx context.Context, sql string, args ...interface{}) Row
- func (d *PgxDriver) WaitForNotification(ctx context.Context) (*Notification, error)
- func (d *PgxDriver) WithTx(ctx context.Context, fn func(tx Transaction) error) error
- type PgxTx
- type Row
- type Rows
- type SQLDriver
- func (d *SQLDriver) AddJobWithTx(ctx context.Context, tx interface{}) (Transaction, error)
- func (d *SQLDriver) AddJobsWithTx(ctx context.Context, tx interface{}, jobs []BatchJob) error
- func (d *SQLDriver) Exec(ctx context.Context, sql string, args ...interface{}) error
- func (d *SQLDriver) Listen(ctx context.Context, channel string) error
- func (d *SQLDriver) Notify(ctx context.Context, channel string, payload string) error
- func (d *SQLDriver) Query(ctx context.Context, sql string, args ...interface{}) (Rows, error)
- func (d *SQLDriver) QueryRow(ctx context.Context, sql string, args ...interface{}) Row
- func (d *SQLDriver) WaitForNotification(ctx context.Context) (*Notification, error)
- func (d *SQLDriver) WithTx(ctx context.Context, fn func(tx Transaction) error) error
- type SQLTx
- type Transaction
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BatchJob ¶
type BatchJob struct {
Worker interface{}
Opts JobOptions
}
BatchJob represents a job to be inserted in a batch operation
type Driver ¶
type Driver interface {
WithTx(ctx context.Context, fn func(tx Transaction) error) error
// Basic operations
Exec(ctx context.Context, sql string, args ...interface{}) error
Query(ctx context.Context, sql string, args ...interface{}) (Rows, error)
QueryRow(ctx context.Context, sql string, args ...interface{}) Row
// Postgres-specific operations
Listen(ctx context.Context, channel string) error
Notify(ctx context.Context, channel string, payload string) error
// New method to handle external transactions
AddJobWithTx(ctx context.Context, tx interface{}) (Transaction, error)
WaitForNotification(ctx context.Context) (*Notification, error)
AddJobsWithTx(ctx context.Context, tx interface{}, jobs []BatchJob) error
}
Core database operations needed for the job queue
func NewPgxDriver ¶
NewPgxDriver creates a new pgx-based driver implementation for PostgreSQL. It uses pgx's native connection pool for better performance and features like automatic connection recovery, statement caching, and native LISTEN/NOTIFY support.
Parameters:
- pool: A *pgxpool.Pool instance. Must be initialized and connected. The pool handles connection lifecycle and maintains a connection pool for optimal performance.
Returns:
- Driver: The database driver implementation
- error: Non-nil if the pool is nil or of wrong type
Example:
config, _ := pgxpool.ParseConfig("postgres://localhost:5432/myapp")
pool, _ := pgxpool.NewWithConfig(context.Background(), config)
driver, err := NewPgxDriver(pool)
func NewSQLDriver ¶
NewSQLDriver creates a new database/sql driver implementation for PostgreSQL. It requires both a database connection and the original connection string because the lib/pq notification listener needs the connection string for establishing its own connection.
Parameters:
- db: An initialized *sql.DB connection pool
- connStr: The PostgreSQL connection string (e.g., "postgres://user:pass@localhost:5432/dbname")
Returns:
- Driver: The database driver implementation
- error: Non-nil if the database connection is nil
Example:
db, _ := sql.Open("postgres", "postgres://localhost:5432/myapp")
driver, err := NewSQLDriver(db, "postgres://localhost:5432/myapp")
type JobOptions ¶
JobOptions represents options for a job
type Notification ¶
Notification represents a PostgreSQL notification
type PgxDriver ¶
type PgxDriver struct {
// contains filtered or unexported fields
}
func (*PgxDriver) AddJobWithTx ¶
func (d *PgxDriver) AddJobWithTx(ctx context.Context, tx interface{}) (Transaction, error)
AddJobWithTx accepts an external pgx transaction and wraps it in our Transaction interface
func (*PgxDriver) AddJobsWithTx ¶
AddJobsWithTx adds multiple jobs as part of an existing transaction
func (*PgxDriver) WaitForNotification ¶
func (d *PgxDriver) WaitForNotification(ctx context.Context) (*Notification, error)
WaitForNotification waits for a notification on any channel this connection is listening on
type PgxTx ¶
type PgxTx interface {
Exec(ctx context.Context, sql string, args ...interface{}) (pgconn.CommandTag, error)
Query(ctx context.Context, sql string, args ...interface{}) (pgx.Rows, error)
QueryRow(ctx context.Context, sql string, args ...interface{}) pgx.Row
}
PgxTx represents a pgx transaction that can be passed in
type Row ¶
type Row interface {
Scan(dest ...interface{}) error
}
Row/Rows interfaces (minimal required functionality)
type SQLDriver ¶
type SQLDriver struct {
// contains filtered or unexported fields
}
func (*SQLDriver) AddJobWithTx ¶
func (d *SQLDriver) AddJobWithTx(ctx context.Context, tx interface{}) (Transaction, error)
AddJobWithTx accepts an external database/sql transaction and wraps it in our Transaction interface
func (*SQLDriver) AddJobsWithTx ¶
AddJobsWithTx adds multiple jobs as part of an existing transaction
func (*SQLDriver) WaitForNotification ¶
func (d *SQLDriver) WaitForNotification(ctx context.Context) (*Notification, error)
WaitForNotification waits for a notification on any channel this connection is listening on
type SQLTx ¶
type SQLTx interface {
ExecContext(ctx context.Context, sql string, args ...interface{}) (sql.Result, error)
QueryContext(ctx context.Context, sql string, args ...interface{}) (*sql.Rows, error)
QueryRowContext(ctx context.Context, sql string, args ...interface{}) *sql.Row
}
SQLTx represents a database/sql transaction that can be passed in
type Transaction ¶
type Transaction interface {
Exec(ctx context.Context, sql string, args ...interface{}) error
Query(ctx context.Context, sql string, args ...interface{}) (Rows, error)
QueryRow(ctx context.Context, sql string, args ...interface{}) Row
}
Transaction represents our internal transaction interface