riverdriver

package module
v0.22.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 10, 2025 License: MPL-2.0 Imports: 5 Imported by: 8

Documentation

Overview

Package riverdriver exposes generic constructs to be implemented by specific drivers that wrap third party database packages, with the aim being to keep the main River interface decoupled from a specific database package so that other packages or other major versions of packages can be supported in future River versions.

River currently only supports Pgx v5, and the interface here wrap it with only the thinnest possible layer. Adding support for alternate packages will require the interface to change substantially, and therefore it should not be implemented or invoked by user code. Changes to interfaces in this package WILL NOT be considered breaking changes for purposes of River's semantic versioning.

Index

Constants

View Source
const AllQueuesString = "*"
View Source
const MigrationLineMain = "main"

Variables

View Source
var (
	ErrClosedPool     = errors.New("underlying driver pool is closed")
	ErrNotImplemented = errors.New("driver does not implement this functionality")
)

Functions

This section is empty.

Types

type ColumnExistsParams added in v0.21.0

type ColumnExistsParams struct {
	Column string
	Schema string
	Table  string
}

type Driver

type Driver[TTx any] interface {
	// GetExecutor gets an executor for the driver.
	//
	// API is not stable. DO NOT USE.
	GetExecutor() Executor

	// GetListener gets a listener for purposes of receiving notifications.
	//
	// API is not stable. DO NOT USE.
	GetListener(params *GetListenenerParams) Listener

	// GetMigrationDefaultLines gets default migration lines that should be
	// applied when using this driver. This is mainly used by riverdbtest to
	// figure out what migration lines should be available by default for new
	// test schemas.
	//
	// API is not stable. DO NOT USE.
	GetMigrationDefaultLines() []string

	// GetMigrationFS gets a filesystem containing migrations for the driver.
	//
	// Each set of migration files is expected to exist within the filesystem as
	// `migration/<line>/`. For example:
	//
	//     migration/main/001_create_river_migration.up.sql
	//
	// API is not stable. DO NOT USE.
	GetMigrationFS(line string) fs.FS

	// GetMigrationLines gets supported migration lines from the driver. Most
	// drivers will only support a single line: MigrationLineMain.
	//
	// API is not stable. DO NOT USE.
	GetMigrationLines() []string

	// GetMigrationTruncateTables gets the tables that should be truncated
	// before or after tests for a specific migration line returned by this
	// driver. Tables to truncate doesn't need to consider intermediary states,
	// and should return tables for the latest migration version.
	//
	// API is not stable. DO NOT USE.
	GetMigrationTruncateTables(line string) []string

	// HasPool returns true if the driver is configured with a database pool.
	//
	// API is not stable. DO NOT USE.
	HasPool() bool

	// SupportsListener gets whether this driver supports a listener. Drivers
	// that don't support a listener support poll only mode only.
	//
	// API is not stable. DO NOT USE.
	SupportsListener() bool

	// UnwrapExecutor gets an executor from a driver transaction.
	//
	// API is not stable. DO NOT USE.
	UnwrapExecutor(tx TTx) ExecutorTx

	// UnwrapTx gets a driver transaction from an executor. This is currently
	// only needed for test transaction helpers.
	//
	// API is not stable. DO NOT USE.
	UnwrapTx(execTx ExecutorTx) TTx
}

Driver provides a database driver for use with river.Client.

Its purpose is to wrap the interface of a third party database package, with the aim being to keep the main River interface decoupled from a specific database package so that other packages or major versions of packages can be supported in future River versions.

River currently only supports Pgx v5, and this interface wraps it with only the thinnest possible layer. Adding support for alternate packages will require it to change substantially, and therefore it should not be implemented or invoked by user code. Changes to this interface WILL NOT be considered breaking changes for purposes of River's semantic versioning.

API is not stable. DO NOT IMPLEMENT.

type Executor

type Executor interface {
	// Begin begins a new subtransaction. ErrSubTxNotSupported may be returned
	// if the executor is a transaction and the driver doesn't support
	// subtransactions (like riverdriver/riverdatabasesql for database/sql).
	Begin(ctx context.Context) (ExecutorTx, error)

	// ColumnExists checks whether a column for a particular table exists for
	// the schema in the current search schema.
	ColumnExists(ctx context.Context, params *ColumnExistsParams) (bool, error)

	// Exec executes raw SQL. Used for migrations.
	Exec(ctx context.Context, sql string, args ...any) (struct{}, error)

	JobCancel(ctx context.Context, params *JobCancelParams) (*rivertype.JobRow, error)
	JobCountByState(ctx context.Context, params *JobCountByStateParams) (int, error)
	JobDelete(ctx context.Context, params *JobDeleteParams) (*rivertype.JobRow, error)
	JobDeleteBefore(ctx context.Context, params *JobDeleteBeforeParams) (int, error)
	JobGetAvailable(ctx context.Context, params *JobGetAvailableParams) ([]*rivertype.JobRow, error)
	JobGetByID(ctx context.Context, params *JobGetByIDParams) (*rivertype.JobRow, error)
	JobGetByIDMany(ctx context.Context, params *JobGetByIDManyParams) ([]*rivertype.JobRow, error)
	JobGetByKindMany(ctx context.Context, params *JobGetByKindManyParams) ([]*rivertype.JobRow, error)
	JobGetStuck(ctx context.Context, params *JobGetStuckParams) ([]*rivertype.JobRow, error)
	JobInsertFastMany(ctx context.Context, params *JobInsertFastManyParams) ([]*JobInsertFastResult, error)
	JobInsertFastManyNoReturning(ctx context.Context, params *JobInsertFastManyParams) (int, error)
	JobInsertFull(ctx context.Context, params *JobInsertFullParams) (*rivertype.JobRow, error)
	JobList(ctx context.Context, params *JobListParams) ([]*rivertype.JobRow, error)
	JobRescueMany(ctx context.Context, params *JobRescueManyParams) (*struct{}, error)
	JobRetry(ctx context.Context, params *JobRetryParams) (*rivertype.JobRow, error)
	JobSchedule(ctx context.Context, params *JobScheduleParams) ([]*JobScheduleResult, error)
	JobSetStateIfRunningMany(ctx context.Context, params *JobSetStateIfRunningManyParams) ([]*rivertype.JobRow, error)
	JobUpdate(ctx context.Context, params *JobUpdateParams) (*rivertype.JobRow, error)
	LeaderAttemptElect(ctx context.Context, params *LeaderElectParams) (bool, error)
	LeaderAttemptReelect(ctx context.Context, params *LeaderElectParams) (bool, error)
	LeaderDeleteExpired(ctx context.Context, params *LeaderDeleteExpiredParams) (int, error)
	LeaderGetElectedLeader(ctx context.Context, params *LeaderGetElectedLeaderParams) (*Leader, error)
	LeaderInsert(ctx context.Context, params *LeaderInsertParams) (*Leader, error)
	LeaderResign(ctx context.Context, params *LeaderResignParams) (bool, error)

	// MigrationDeleteAssumingMainMany deletes many migrations assuming
	// everything is on the main line. This is suitable for use in databases on
	// a version before the `line` column exists.
	MigrationDeleteAssumingMainMany(ctx context.Context, params *MigrationDeleteAssumingMainManyParams) ([]*Migration, error)

	// MigrationDeleteByLineAndVersionMany deletes many migration versions on a
	// particular line.
	MigrationDeleteByLineAndVersionMany(ctx context.Context, params *MigrationDeleteByLineAndVersionManyParams) ([]*Migration, error)

	// MigrationGetAllAssumingMain gets all migrations assuming everything is on
	// the main line. This is suitable for use in databases on a version before
	// the `line` column exists.
	MigrationGetAllAssumingMain(ctx context.Context, params *MigrationGetAllAssumingMainParams) ([]*Migration, error)

	// MigrationGetByLine gets all currently applied migrations.
	MigrationGetByLine(ctx context.Context, params *MigrationGetByLineParams) ([]*Migration, error)

	// MigrationInsertMany inserts many migration versions.
	MigrationInsertMany(ctx context.Context, params *MigrationInsertManyParams) ([]*Migration, error)

	// MigrationInsertManyAssumingMain inserts many migrations, assuming they're
	// on the main line. This operation is necessary for compatibility before
	// the `line` column was added to the migrations table.
	MigrationInsertManyAssumingMain(ctx context.Context, params *MigrationInsertManyAssumingMainParams) ([]*Migration, error)

	NotifyMany(ctx context.Context, params *NotifyManyParams) error
	PGAdvisoryXactLock(ctx context.Context, key int64) (*struct{}, error)

	QueueCreateOrSetUpdatedAt(ctx context.Context, params *QueueCreateOrSetUpdatedAtParams) (*rivertype.Queue, error)
	QueueDeleteExpired(ctx context.Context, params *QueueDeleteExpiredParams) ([]string, error)
	QueueGet(ctx context.Context, params *QueueGetParams) (*rivertype.Queue, error)
	QueueList(ctx context.Context, params *QueueListParams) ([]*rivertype.Queue, error)
	QueuePause(ctx context.Context, params *QueuePauseParams) error
	QueueResume(ctx context.Context, params *QueueResumeParams) error
	QueueUpdate(ctx context.Context, params *QueueUpdateParams) (*rivertype.Queue, error)
	QueryRow(ctx context.Context, sql string, args ...any) Row
	SchemaGetExpired(ctx context.Context, params *SchemaGetExpiredParams) ([]string, error)

	// TableExists checks whether a table exists for the schema in the current
	// search schema.
	TableExists(ctx context.Context, params *TableExistsParams) (bool, error)
}

Executor provides River operations against a database. It may be a database pool or transaction.

API is not stable. DO NOT IMPLEMENT.

type ExecutorTx

type ExecutorTx interface {
	Executor

	// Commit commits the transaction.
	//
	// API is not stable. DO NOT USE.
	Commit(ctx context.Context) error

	// Rollback rolls back the transaction.
	//
	// API is not stable. DO NOT USE.
	Rollback(ctx context.Context) error
}

ExecutorTx is an executor which is a transaction. In addition to standard Executor operations, it may be committed or rolled back.

API is not stable. DO NOT IMPLEMENT.

type GetListenenerParams added in v0.21.0

type GetListenenerParams struct {
	Schema string
}

type JobCancelParams added in v0.0.23

type JobCancelParams struct {
	ID                int64
	CancelAttemptedAt time.Time
	ControlTopic      string
	Schema            string
}

type JobCountByStateParams added in v0.21.0

type JobCountByStateParams struct {
	Schema string
	State  rivertype.JobState
}

type JobDeleteBeforeParams added in v0.0.23

type JobDeleteBeforeParams struct {
	CancelledFinalizedAtHorizon time.Time
	CompletedFinalizedAtHorizon time.Time
	DiscardedFinalizedAtHorizon time.Time
	Max                         int
	Schema                      string
}

type JobDeleteParams added in v0.21.0

type JobDeleteParams struct {
	ID     int64
	Schema string
}

type JobGetAvailableParams added in v0.0.23

type JobGetAvailableParams struct {
	ClientID   string
	Max        int
	Now        *time.Time
	ProducerID int64
	Queue      string
	Schema     string
}

type JobGetByIDManyParams added in v0.21.0

type JobGetByIDManyParams struct {
	ID     []int64
	Schema string
}

type JobGetByIDParams added in v0.21.0

type JobGetByIDParams struct {
	ID     int64
	Schema string
}

type JobGetByKindManyParams added in v0.21.0

type JobGetByKindManyParams struct {
	Kind   []string
	Schema string
}

type JobGetStuckParams added in v0.0.23

type JobGetStuckParams struct {
	Max          int
	Schema       string
	StuckHorizon time.Time
}

type JobInsertFastManyParams added in v0.21.0

type JobInsertFastManyParams struct {
	Jobs   []*JobInsertFastParams
	Schema string
}

type JobInsertFastParams added in v0.0.23

type JobInsertFastParams struct {
	// Args contains the raw underlying job arguments struct. It has already been
	// encoded into EncodedArgs, but the original is kept here for to leverage its
	// struct tags and interfaces, such as for use in unique key generation.
	Args         rivertype.JobArgs
	CreatedAt    *time.Time
	EncodedArgs  []byte
	Kind         string
	MaxAttempts  int
	Metadata     []byte
	Priority     int
	Queue        string
	ScheduledAt  *time.Time
	State        rivertype.JobState
	Tags         []string
	UniqueKey    []byte
	UniqueStates byte
}

type JobInsertFastResult added in v0.12.0

type JobInsertFastResult struct {
	Job                      *rivertype.JobRow
	UniqueSkippedAsDuplicate bool
}

type JobInsertFullParams added in v0.0.23

type JobInsertFullParams struct {
	Attempt      int
	AttemptedAt  *time.Time
	AttemptedBy  []string
	CreatedAt    *time.Time
	EncodedArgs  []byte
	Errors       [][]byte
	FinalizedAt  *time.Time
	Kind         string
	MaxAttempts  int
	Metadata     []byte
	Priority     int
	Queue        string
	ScheduledAt  *time.Time
	Schema       string
	State        rivertype.JobState
	Tags         []string
	UniqueKey    []byte
	UniqueStates byte
}

type JobListParams added in v0.19.0

type JobListParams struct {
	Max           int32
	NamedArgs     map[string]any
	OrderByClause string
	Schema        string
	WhereClause   string
}

type JobRescueManyParams added in v0.0.23

type JobRescueManyParams struct {
	ID          []int64
	Error       [][]byte
	FinalizedAt []time.Time
	ScheduledAt []time.Time
	Schema      string
	State       []string
}

type JobRetryParams added in v0.21.0

type JobRetryParams struct {
	ID     int64
	Schema string
}

type JobScheduleParams added in v0.0.23

type JobScheduleParams struct {
	Max    int
	Now    time.Time
	Schema string
}

type JobScheduleResult added in v0.5.0

type JobScheduleResult struct {
	Job               rivertype.JobRow
	ConflictDiscarded bool
}

type JobSetStateIfRunningManyParams added in v0.12.1

type JobSetStateIfRunningManyParams struct {
	ID              []int64
	Attempt         []*int
	ErrData         [][]byte
	FinalizedAt     []*time.Time
	MetadataDoMerge []bool
	MetadataUpdates [][]byte
	ScheduledAt     []*time.Time
	Schema          string
	State           []rivertype.JobState
}

JobSetStateIfRunningManyParams are parameters to update the state of currently running jobs. Use one of the constructors below to ensure a correct combination of parameters.

type JobSetStateIfRunningParams added in v0.0.23

type JobSetStateIfRunningParams struct {
	ID              int64
	Attempt         *int
	ErrData         []byte
	FinalizedAt     *time.Time
	MetadataDoMerge bool
	MetadataUpdates []byte
	ScheduledAt     *time.Time
	Schema          string // added by completer
	State           rivertype.JobState
}

JobSetStateIfRunningParams are parameters to update the state of a currently running job. Use one of the constructors below to ensure a correct combination of parameters.

func JobSetStateCancelled added in v0.0.23

func JobSetStateCancelled(id int64, finalizedAt time.Time, errData []byte, metadataUpdates []byte) *JobSetStateIfRunningParams

func JobSetStateCompleted added in v0.0.23

func JobSetStateCompleted(id int64, finalizedAt time.Time, metadataUpdates []byte) *JobSetStateIfRunningParams

func JobSetStateDiscarded added in v0.0.23

func JobSetStateDiscarded(id int64, finalizedAt time.Time, errData []byte, metadataUpdates []byte) *JobSetStateIfRunningParams

func JobSetStateErrorAvailable added in v0.0.23

func JobSetStateErrorAvailable(id int64, scheduledAt time.Time, errData []byte, metadataUpdates []byte) *JobSetStateIfRunningParams

func JobSetStateErrorRetryable added in v0.0.23

func JobSetStateErrorRetryable(id int64, scheduledAt time.Time, errData []byte, metadataUpdates []byte) *JobSetStateIfRunningParams

func JobSetStateSnoozed added in v0.0.23

func JobSetStateSnoozed(id int64, scheduledAt time.Time, attempt int, metadataUpdates []byte) *JobSetStateIfRunningParams

func JobSetStateSnoozedAvailable added in v0.0.23

func JobSetStateSnoozedAvailable(id int64, scheduledAt time.Time, attempt int, metadataUpdates []byte) *JobSetStateIfRunningParams

type JobUpdateParams added in v0.0.23

type JobUpdateParams struct {
	ID                  int64
	AttemptDoUpdate     bool
	Attempt             int
	AttemptedAtDoUpdate bool
	AttemptedAt         *time.Time
	AttemptedByDoUpdate bool
	AttemptedBy         []string
	ErrorsDoUpdate      bool
	Errors              [][]byte
	FinalizedAtDoUpdate bool
	FinalizedAt         *time.Time
	Schema              string
	StateDoUpdate       bool
	State               rivertype.JobState
	// Deprecated and will be removed when advisory lock unique path is removed.
	UniqueKeyDoUpdate bool
	// Deprecated and will be removed when advisory lock unique path is removed.
	UniqueKey []byte
}

type Leader added in v0.0.23

type Leader struct {
	ElectedAt time.Time
	ExpiresAt time.Time
	LeaderID  string
}

Leader represents a River leader.

API is not stable. DO NOT USE.

type LeaderDeleteExpiredParams added in v0.21.0

type LeaderDeleteExpiredParams struct {
	Now    *time.Time
	Schema string
}

type LeaderElectParams added in v0.0.23

type LeaderElectParams struct {
	LeaderID string
	Now      *time.Time
	Schema   string
	TTL      time.Duration
}

type LeaderGetElectedLeaderParams added in v0.21.0

type LeaderGetElectedLeaderParams struct {
	Schema string
}

type LeaderInsertParams added in v0.0.23

type LeaderInsertParams struct {
	ElectedAt *time.Time
	ExpiresAt *time.Time
	LeaderID  string
	Now       *time.Time
	Schema    string
	TTL       time.Duration
}

type LeaderResignParams added in v0.0.23

type LeaderResignParams struct {
	LeaderID        string
	LeadershipTopic string
	Schema          string
}

type Listener added in v0.0.23

type Listener interface {
	Close(ctx context.Context) error
	Connect(ctx context.Context) error
	Listen(ctx context.Context, topic string) error
	Ping(ctx context.Context) error
	Schema() string
	SetAfterConnectExec(sql string) // should only ever be used in testing
	Unlisten(ctx context.Context, topic string) error
	WaitForNotification(ctx context.Context) (*Notification, error)
}

Listener listens for notifications. In Postgres, this is a database connection where `LISTEN` has been run.

API is not stable. DO NOT IMPLEMENT.

type Migration

type Migration struct {
	// CreatedAt is when the migration was initially created.
	//
	// API is not stable. DO NOT USE.
	CreatedAt time.Time

	// Line is the migration line that the migration belongs to.
	//
	// API is not stable. DO NOT USE.
	Line string

	// Version is the version of the migration.
	//
	// API is not stable. DO NOT USE.
	Version int
}

Migration represents a River migration.

API is not stable. DO NOT USE.

type MigrationDeleteAssumingMainManyParams added in v0.21.0

type MigrationDeleteAssumingMainManyParams struct {
	Schema   string
	Versions []int
}

type MigrationDeleteByLineAndVersionManyParams added in v0.21.0

type MigrationDeleteByLineAndVersionManyParams struct {
	Line     string
	Schema   string
	Versions []int
}

type MigrationGetAllAssumingMainParams added in v0.21.0

type MigrationGetAllAssumingMainParams struct {
	Schema string
}

type MigrationGetByLineParams added in v0.21.0

type MigrationGetByLineParams struct {
	Line   string
	Schema string
}

type MigrationInsertManyAssumingMainParams added in v0.21.0

type MigrationInsertManyAssumingMainParams struct {
	Schema   string
	Versions []int
}

type MigrationInsertManyParams added in v0.21.0

type MigrationInsertManyParams struct {
	Line     string
	Schema   string
	Versions []int
}

type Notification added in v0.0.23

type Notification struct {
	Payload string
	Topic   string
}

type NotifyManyParams added in v0.5.0

type NotifyManyParams struct {
	Payload []string
	Topic   string
	Schema  string
}

NotifyManyParams are parameters to issue many pubsub notifications all at once for a single topic.

type ProducerKeepAliveParams added in v0.20.0

type ProducerKeepAliveParams struct {
	ID                    int64
	QueueName             string
	Schema                string
	StaleUpdatedAtHorizon time.Time
}

type QueueCreateOrSetUpdatedAtParams added in v0.5.0

type QueueCreateOrSetUpdatedAtParams struct {
	Metadata  []byte
	Name      string
	Now       *time.Time
	PausedAt  *time.Time
	Schema    string
	UpdatedAt *time.Time
}

type QueueDeleteExpiredParams added in v0.5.0

type QueueDeleteExpiredParams struct {
	Max              int
	Schema           string
	UpdatedAtHorizon time.Time
}

type QueueGetParams added in v0.21.0

type QueueGetParams struct {
	Name   string
	Schema string
}

type QueueListParams added in v0.21.0

type QueueListParams struct {
	Limit  int
	Schema string
}

type QueuePauseParams added in v0.21.0

type QueuePauseParams struct {
	Name   string
	Schema string
}

type QueueResumeParams added in v0.21.0

type QueueResumeParams struct {
	Name   string
	Schema string
}

type QueueUpdateParams added in v0.20.0

type QueueUpdateParams struct {
	Metadata         []byte
	MetadataDoUpdate bool
	Name             string
	Schema           string
}

type Row added in v0.21.0

type Row interface {
	Scan(dest ...any) error
}

type Schema added in v0.21.0

type Schema struct {
	Name string
}

type SchemaGetExpiredParams added in v0.21.0

type SchemaGetExpiredParams struct {
	BeforeName string
	Prefix     string
}

type TableExistsParams added in v0.21.0

type TableExistsParams struct {
	Schema string
	Table  string
}

Directories

Path Synopsis
riverpgxv5 module
riversqlite module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL