rivertest

package
v0.20.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 9, 2025 License: MPL-2.0 Imports: 24 Imported by: 0

Documentation

Overview

Package rivertest contains test assertions that can be used in a project's tests to verify that certain actions occurred from the main river package.

Example (RequireInserted)

Example_requireInserted demonstrates the use of the RequireInserted test assertion, which verifies that a single job was inserted.

package main

import (
	"context"
	"fmt"
	"log/slog"
	"testing"

	"github.com/jackc/pgx/v5/pgxpool"

	"github.com/riverqueue/river"
	"github.com/riverqueue/river/internal/riverinternaltest"
	"github.com/riverqueue/river/riverdriver/riverpgxv5"
	"github.com/riverqueue/river/rivershared/util/slogutil"
	"github.com/riverqueue/river/rivertest"
)

type RequiredArgs struct {
	Message string `json:"message"`
}

func (RequiredArgs) Kind() string { return "required" }

type RequiredWorker struct {
	river.WorkerDefaults[RequiredArgs]
}

func (w *RequiredWorker) Work(ctx context.Context, job *river.Job[RequiredArgs]) error { return nil }

// Example_requireInserted demonstrates the use of the RequireInserted test
// assertion, which verifies that a single job was inserted.
func main() {
	ctx := context.Background()

	dbPool, err := pgxpool.NewWithConfig(ctx, riverinternaltest.DatabaseConfig("river_test_example"))
	if err != nil {
		panic(err)
	}
	defer dbPool.Close()

	// Required for the purpose of this test, but not necessary in real usage.
	if err := riverinternaltest.TruncateRiverTables(ctx, dbPool); err != nil {
		panic(err)
	}

	workers := river.NewWorkers()
	river.AddWorker(workers, &RequiredWorker{})

	riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
		Logger:  slog.New(&slogutil.SlogMessageOnlyHandler{Level: slog.LevelWarn}),
		Workers: workers,
	})
	if err != nil {
		panic(err)
	}

	tx, err := dbPool.Begin(ctx)
	if err != nil {
		panic(err)
	}
	defer tx.Rollback(ctx)

	_, err = riverClient.InsertTx(ctx, tx, &RequiredArgs{
		Message: "Hello.",
	}, nil)
	if err != nil {
		panic(err)
	}

	// Required for purposes of our example here, but in reality t will be the
	// *testing.T that comes from a test's argument.
	t := &testing.T{}

	job := rivertest.RequireInsertedTx[*riverpgxv5.Driver](ctx, t, tx, &RequiredArgs{}, nil)
	fmt.Printf("Test passed with message: %s\n", job.Args.Message)

	// Verify the same job again, and this time that it was inserted at the
	// default priority and default queue.
	_ = rivertest.RequireInsertedTx[*riverpgxv5.Driver](ctx, t, tx, &RequiredArgs{}, &rivertest.RequireInsertedOpts{
		Priority: 1,
		Queue:    river.QueueDefault,
	})

	// Insert and verify one on a pool instead of transaction.
	_, err = riverClient.Insert(ctx, &RequiredArgs{Message: "Hello from pool."}, nil)
	if err != nil {
		panic(err)
	}
	_ = rivertest.RequireInserted(ctx, t, riverpgxv5.New(dbPool), &RequiredArgs{}, nil)

}
Output:

Test passed with message: Hello.
Example (RequireManyInserted)

Example_requireManyInserted demonstrates the use of the RequireManyInserted test assertion, which requires that multiple jobs of the specified kinds were inserted.

package main

import (
	"context"
	"fmt"
	"log/slog"
	"testing"

	"github.com/jackc/pgx/v5/pgxpool"

	"github.com/riverqueue/river"
	"github.com/riverqueue/river/internal/riverinternaltest"
	"github.com/riverqueue/river/riverdriver/riverpgxv5"
	"github.com/riverqueue/river/rivershared/util/slogutil"
	"github.com/riverqueue/river/rivertest"
)

type FirstRequiredArgs struct {
	Message string `json:"message"`
}

func (FirstRequiredArgs) Kind() string { return "first_required" }

type FirstRequiredWorker struct {
	river.WorkerDefaults[FirstRequiredArgs]
}

func (w *FirstRequiredWorker) Work(ctx context.Context, job *river.Job[FirstRequiredArgs]) error {
	return nil
}

type SecondRequiredArgs struct {
	Message string `json:"message"`
}

func (SecondRequiredArgs) Kind() string { return "second_required" }

type SecondRequiredWorker struct {
	river.WorkerDefaults[SecondRequiredArgs]
}

func (w *SecondRequiredWorker) Work(ctx context.Context, job *river.Job[SecondRequiredArgs]) error {
	return nil
}

// Example_requireManyInserted demonstrates the use of the RequireManyInserted test
// assertion, which requires that multiple jobs of the specified kinds were
// inserted.
func main() {
	ctx := context.Background()

	dbPool, err := pgxpool.NewWithConfig(ctx, riverinternaltest.DatabaseConfig("river_test_example"))
	if err != nil {
		panic(err)
	}
	defer dbPool.Close()

	// Required for the purpose of this test, but not necessary in real usage.
	if err := riverinternaltest.TruncateRiverTables(ctx, dbPool); err != nil {
		panic(err)
	}

	workers := river.NewWorkers()
	river.AddWorker(workers, &FirstRequiredWorker{})
	river.AddWorker(workers, &SecondRequiredWorker{})

	riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
		Logger:  slog.New(&slogutil.SlogMessageOnlyHandler{Level: slog.LevelWarn}),
		Workers: workers,
	})
	if err != nil {
		panic(err)
	}

	tx, err := dbPool.Begin(ctx)
	if err != nil {
		panic(err)
	}
	defer tx.Rollback(ctx)

	_, err = riverClient.InsertTx(ctx, tx, &FirstRequiredArgs{Message: "Hello from first."}, nil)
	if err != nil {
		panic(err)
	}

	_, err = riverClient.InsertTx(ctx, tx, &SecondRequiredArgs{Message: "Hello from second."}, nil)
	if err != nil {
		panic(err)
	}

	_, err = riverClient.InsertTx(ctx, tx, &FirstRequiredArgs{Message: "Hello from first (again)."}, nil)
	if err != nil {
		panic(err)
	}

	// Required for purposes of our example here, but in reality t will be the
	// *testing.T that comes from a test's argument.
	t := &testing.T{}

	jobs := rivertest.RequireManyInsertedTx[*riverpgxv5.Driver](ctx, t, tx, []rivertest.ExpectedJob{
		{Args: &FirstRequiredArgs{}},
		{Args: &SecondRequiredArgs{}},
		{Args: &FirstRequiredArgs{}},
	})
	for i, job := range jobs {
		fmt.Printf("Job %d args: %s\n", i, string(job.EncodedArgs))
	}

	// Verify again, and this time that the second job was inserted at the
	// default priority and default queue.
	_ = rivertest.RequireManyInsertedTx[*riverpgxv5.Driver](ctx, t, tx, []rivertest.ExpectedJob{
		{Args: &SecondRequiredArgs{}, Opts: &rivertest.RequireInsertedOpts{
			Priority: 1,
			Queue:    river.QueueDefault,
		}},
	})

	// Insert and verify one on a pool instead of transaction.
	_, err = riverClient.Insert(ctx, &FirstRequiredArgs{Message: "Hello from pool."}, nil)
	if err != nil {
		panic(err)
	}
	_ = rivertest.RequireManyInserted(ctx, t, riverpgxv5.New(dbPool), []rivertest.ExpectedJob{
		{Args: &FirstRequiredArgs{}},
	})

}
Output:

Job 0 args: {"message": "Hello from first."}
Job 1 args: {"message": "Hello from second."}
Job 2 args: {"message": "Hello from first (again)."}

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func RequireInserted

func RequireInserted[TDriver riverdriver.Driver[TTx], TTx any, TArgs river.JobArgs](ctx context.Context, tb testing.TB, driver TDriver, expectedJob TArgs, opts *RequireInsertedOpts) *river.Job[TArgs]

RequireInserted is a test helper that verifies that a job of the given kind was inserted for work, failing the test if it wasn't. If found, the inserted job is returned so that further assertions can be made against it.

job := RequireInserted(ctx, t, riverpgxv5.New(dbPool), &Job1Args{}, nil)

This variant takes a driver that wraps a database pool. See also RequireManyInsertedTx which takes a transaction.

A RequireInsertedOpts struct can be provided as the last argument, and if it is, its properties (e.g. max attempts, priority, queue name) will act as required assertions in the inserted job row.

The assertion will fail if more than one job of the given kind was found because at that point the job to return is ambiguous. Use RequireManyInserted to cover that case instead.

func RequireInsertedTx added in v0.0.4

func RequireInsertedTx[TDriver riverdriver.Driver[TTx], TTx any, TArgs river.JobArgs](ctx context.Context, tb testing.TB, tx TTx, expectedJob TArgs, opts *RequireInsertedOpts) *river.Job[TArgs]

RequireInsertedTx is a test helper that verifies that a job of the given kind was inserted for work, failing the test if it wasn't. If found, the inserted job is returned so that further assertions can be made against it.

job := RequireInsertedTx[*riverpgxv5.Driver](ctx, t, tx, &Job1Args{}, nil)

This variant takes a transaction. See also RequireInserted which takes a driver that wraps a database pool.

A RequireInsertedOpts struct can be provided as the last argument, and if it is, its properties (e.g. max attempts, priority, queue name) will act as required assertions in the inserted job row.

The assertion will fail if more than one job of the given kind was found because at that point the job to return is ambiguous. Use RequireManyInserted to cover that case instead.

func RequireManyInserted

func RequireManyInserted[TDriver riverdriver.Driver[TTx], TTx any](ctx context.Context, tb testing.TB, driver TDriver, expectedJobs []ExpectedJob) []*rivertype.JobRow

RequireManyInserted is a test helper that verifies that jobs of the given kinds were inserted for work, failing the test if they weren't, or were inserted in the wrong order. If found, the inserted jobs are returned so that further assertions can be made against them.

job := RequireManyInserted(ctx, t, riverpgxv5.New(dbPool), []river.JobArgs{
	&Job1Args{},
})

This variant takes a driver that wraps a database pool. See also RequireManyInsertedTx which takes a transaction.

A RequireInsertedOpts struct can be provided for each expected job, and if it is, its properties (e.g. max attempts, priority, queue name) will act as required assertions for the corresponding inserted job row.

The assertion expects emitted jobs to have occurred exactly in the order and the number specified, and will fail in case this expectation isn't met. So if a job of a certain kind is emitted multiple times, it must be expected multiple times.

func RequireManyInsertedTx added in v0.0.4

func RequireManyInsertedTx[TDriver riverdriver.Driver[TTx], TTx any](ctx context.Context, tb testing.TB, tx TTx, expectedJobs []ExpectedJob) []*rivertype.JobRow

RequireManyInsertedTx is a test helper that verifies that jobs of the given kinds were inserted for work, failing the test if they weren't, or were inserted in the wrong order. If found, the inserted jobs are returned so that further assertions can be made against them.

job := RequireManyInsertedTx[*riverpgxv5.Driver](ctx, t, tx, []river.JobArgs{
	&Job1Args{},
})

This variant takes a transaction. See also RequireManyInserted which takes a driver that wraps a database pool.

A RequireInsertedOpts struct can be provided for each expected job, and if it is, its properties (e.g. max attempts, priority, queue name) will act as required assertions for the corresponding inserted job row.

The assertion expects emitted jobs to have occurred exactly in the order and the number specified, and will fail in case this expectation isn't met. So if a job of a certain kind is emitted multiple times, it must be expected multiple times.

func RequireNotInserted added in v0.6.0

func RequireNotInserted[TDriver riverdriver.Driver[TTx], TTx any, TArgs river.JobArgs](ctx context.Context, tb testing.TB, driver TDriver, expectedJob TArgs, opts *RequireInsertedOpts)

RequireNotInserted is a test helper that verifies that a job of the given kind was not inserted for work, failing the test if one was.

job := RequireNotInserted(ctx, t, riverpgxv5.New(dbPool), &Job1Args{}, nil)

This variant takes a driver that wraps a database pool. See also RequireNotInsertedTx which takes a transaction.

A RequireInsertedOpts struct can be provided as the last argument, and if it is, its properties (e.g. max attempts, priority, queue name) will act as requirements on a found row. If any fields are set, then the test will fail if a job is found that matches all of them. If any property doesn't match a found row, the row isn't considered a match, and the assertion doesn't fail.

If more rows than one were found, the assertion fails if any of them match the given opts.

func RequireNotInsertedTx added in v0.6.0

func RequireNotInsertedTx[TDriver riverdriver.Driver[TTx], TTx any, TArgs river.JobArgs](ctx context.Context, tb testing.TB, tx TTx, expectedJob TArgs, opts *RequireInsertedOpts)

RequireNotInsertedTx is a test helper that verifies that a job of the given kind was not inserted for work, failing the test if one was.

job := RequireInsertedTx[*riverpgxv5.Driver](ctx, t, tx, &Job1Args{}, nil)

This variant takes a transaction. See also RequireNotInserted which takes a driver that wraps a database pool.

A RequireInsertedOpts struct can be provided as the last argument, and if it is, its properties (e.g. max attempts, priority, queue name) will act as requirements on a found row. If any fields are set, then the test will fail if a job is found that matches all of them. If any property doesn't match a found row, the row isn't considered a match, and the assertion doesn't fail.

If more rows than one were found, the assertion fails if any of them match the given opts.

func WorkContext added in v0.12.0

func WorkContext[TTx any](ctx context.Context, client *river.Client[TTx]) context.Context

WorkContext returns a realistic context that can be used to test JobArgs.Work implementations.

In particular, adds a client to the context so that river.ClientFromContext is usable in the test suite.

Types

type ExpectedJob

type ExpectedJob struct {
	// Args are job arguments to expect.
	Args river.JobArgs

	// Opts are options for the specific required job including insertion
	// options to assert against.
	Opts *RequireInsertedOpts
}

ExpectedJob is a single job to expect encapsulating job args and possible insertion options.

type PanicError added in v0.18.0

type PanicError struct {
	Cause any
	Trace string
}

func (*PanicError) Error added in v0.18.0

func (e *PanicError) Error() string

func (*PanicError) Is added in v0.18.0

func (e *PanicError) Is(target error) bool

type RequireInsertedOpts

type RequireInsertedOpts struct {
	// MaxAttempts is the expected maximum number of total attempts for the
	// inserted job.
	//
	// No assertion is made if left the zero value.
	MaxAttempts int

	// Priority is the expected priority for the inserted job.
	//
	// No assertion is made if left the zero value.
	Priority int

	// Queue is the expected queue name of the inserted job.
	//
	// No assertion is made if left the zero value.
	Queue string

	// ScheduledAt is the expected scheduled at time of the inserted job. Times
	// are truncated to the microsecond level for comparison to account for the
	// difference between Go storing times to nanoseconds and Postgres storing
	// only to microsecond precision.
	//
	// No assertion is made if left the zero value.
	ScheduledAt time.Time

	// State is the expected state of the inserted job.
	//
	// No assertion is made if left the zero value.
	State rivertype.JobState

	// Tags are the expected tags of the inserted job.
	//
	// No assertion is made if left the zero value.
	Tags []string
}

Options for RequireInserted functions including expectations for various queuing properties that stem from InsertOpts.

Multiple properties set on this struct increase the specificity on a job to match, acting like an AND condition on each.

In the case of RequireInserted or RequireInsertedMany, if multiple properties are set, a job must match all of them to be considered a successful match.

In the case of RequireNotInserted, if multiple properties are set, a test failure is triggered only if all match. If any one of them was different, an inserted job isn't considered a match, and RequireNotInserted succeeds.

type TimeStub added in v0.17.0

type TimeStub struct {
	// contains filtered or unexported fields
}

TimeStub implements rivertype.TimeGenerator to allow time to be stubbed in tests. It is implemented in a thread-safe manner with a mutex, allowing the current time to be stubbed at any time with StubNowUTC.

func (*TimeStub) NowUTC added in v0.17.0

func (t *TimeStub) NowUTC() time.Time

NowUTC returns the current time. This may be a stubbed time if the time has been actively stubbed in a test.

func (*TimeStub) NowUTCOrNil added in v0.17.0

func (t *TimeStub) NowUTCOrNil() *time.Time

NowUTCOrNil returns if the currently stubbed time _if_ the current time is stubbed, and returns nil otherwise. This is generally useful in cases where a component may want to use a stubbed time if the time is stubbed, but to fall back to a database time default otherwise.

func (*TimeStub) StubNowUTC added in v0.17.0

func (t *TimeStub) StubNowUTC(nowUTC time.Time) time.Time

StubNowUTC stubs the current time. It will panic if invoked outside of tests. Returns the same time passed as parameter for convenience.

type WorkResult added in v0.18.0

type WorkResult struct {
	// EventKind is the kind of event that occurred following execution.
	EventKind river.EventKind

	// Job is the updated job row from the database _after_ it has been worked.
	Job *rivertype.JobRow
}

WorkResult is the result of working a job in the test Worker.

type Worker added in v0.17.0

type Worker[T river.JobArgs, TTx any] struct {
	// contains filtered or unexported fields
}

Worker makes it easier to test river workers. Once built, the worker can be used to insert and work any number jobs:

testWorker := rivertest.NewWorker(t, driver, config, worker)
result, err := testWorker.Work(ctx, t, tx, args, nil)
if err != nil {
	t.Fatalf("failed to work job: %s", err)
}
if result.Kind != river.EventKindJobCompleted {
	t.Fatalf("expected job to be completed, got %s", result.Kind)
}

An existing job (inserted using external logic) can also be worked:

job := client.InsertTx(ctx, tx, args, nil)
// ...
result, err := worker.WorkJob(ctx, t, tx, job)
if err != nil {
	t.Fatalf("failed to work job: %s", err)
}

In all cases the underlying river.Worker will be called with the job transitioned into a running state. The execution environment has a realistic River context with access to all River features, including river.ClientFromContext and worker middleware.

func NewWorker added in v0.17.0

func NewWorker[T river.JobArgs, TTx any](tb testing.TB, driver riverdriver.Driver[TTx], config *river.Config, worker river.Worker[T]) *Worker[T, TTx]

NewWorker creates a new test Worker for testing the provided river.Worker. The worker uses the provided driver and River config to populate default values on test jobs and to configure the execution environment.

A pool-less driver is recommended for most usage, as individual job inserts and executions will happen within a provided transaction. This enables many parallel test cases to run with full isolation, each in their own transaction.

The worker is configured to disable unique enforcement by default, as this would otherwise prevent conflicting jobs from being tested in parallel.

func (*Worker[T, TTx]) Work added in v0.17.0

func (w *Worker[T, TTx]) Work(ctx context.Context, tb testing.TB, tx TTx, args T, opts *river.InsertOpts) (*WorkResult, error)

Work inserts a job with the provided arguments and optional insert options, then works it. The transaction is used for all work-related database operations so that the caller can easily roll back at the end of the test to maintain a clean database state.

The returned WorkResult contains the updated job row from the database _after_ it has been worked, as well as the kind of event that occurred.

The returned error only reflects _real_ errors and does not include explicitly returned snooze or cancel errors from the worker.

func (*Worker[T, TTx]) WorkJob added in v0.17.0

func (w *Worker[T, TTx]) WorkJob(ctx context.Context, tb testing.TB, tx TTx, job *rivertype.JobRow) (*WorkResult, error)

WorkJob works an existing job already in the database. The job must be inserted using external logic prior to calling this method. The transaction is used for all work-related database operations so that the caller can easily roll back at the end of the test to maintain a clean database state.

The returned WorkResult contains the updated job row from the database _after_ it has been worked, as well as the kind of event that occurred.

The returned error only reflects _real_ errors and does not include explicitly returned snooze or cancel errors from the worker.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL