steve

package
v0.0.0-...-326089c Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 20, 2025 License: AGPL-3.0 Imports: 14 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrJobNotFound = errors.New("job not found")
View Source
var ErrShutdown = errors.New("shutdown")

Functions

func RegisterWorker

func RegisterWorker[T JobArgs](client *Client, worker Worker[T])

RegisterWorker registers a Worker on the provided client. Each Worker must be registered so that the Client knows it should handle a specific kind of job (as returned by its `Kind()` method).

Note that RegisterWorker will panic if a worker of this kind is already registered for this Client

Types

type AttemptError

type AttemptError struct {
	At    time.Time `json:"at"`
	Error string    `json:"error"`
}

type Client

type Client struct {
	// contains filtered or unexported fields
}

func Get

func Get() *Client

func Initialize

func Initialize(db db.Handler, cfg Config, logger zerolog.Logger) *Client

func (*Client) GetJobDetails

func (c *Client) GetJobDetails(id int32) (JobDetails, error)

func (*Client) InsertJob

func (c *Client) InsertJob(db db.Handler, args JobArgs) error

func (*Client) InsertJobs

func (c *Client) InsertJobs(db db.Handler, args []JobArgs) error

func (*Client) ListJobs

func (c *Client) ListJobs(only, exclude []JobStatus) ([]JobInfo, error)

func (*Client) Run

func (c *Client) Run(ctx context.Context) error

func (*Client) RunJob

func (c *Client) RunJob(ctx context.Context, id int32) error

func (*Client) Shutdown

func (c *Client) Shutdown(ctx context.Context) error

func (*Client) Triage

func (c *Client) Triage() (int, error)

type Config

type Config struct {
	Workers int `koanf:"workers"`
	Timeout int `koanf:"timeout"`
}

type ErrJobBlocked

type ErrJobBlocked struct {
	// contains filtered or unexported fields
}

func (*ErrJobBlocked) Error

func (e *ErrJobBlocked) Error() string

type Job

type Job[T JobArgs] struct {
	*JobInfo
	Args T
}

type JobArgs

type JobArgs interface {
	Kind() string
	Sequences() []string
}

type JobDetails

type JobDetails struct {
	JobInfo
	CreatedAt    time.Time          `json:"created_at"`
	ScheduledAt  pgtype.Timestamptz `json:"scheduled_at"`
	Errors       []AttemptError     `json:"errors"`
	Dependents   []JobInfo          `json:"dependents"`
	Dependencies []JobInfo          `json:"dependencies"`
}

type JobInfo

type JobInfo struct {
	ID          int32     `json:"id"`
	Status      JobStatus `json:"status"`
	Attempts    int       `json:"attempts"`
	Kind        string    `json:"kind"`
	EncodedArgs []byte    `json:"args"`
}

type JobStatus

type JobStatus uint8
const (
	JobStatusQueued     JobStatus = 0
	JobStatusBlocked    JobStatus = 1
	JobStatusReady      JobStatus = 2
	JobStatusRunning    JobStatus = 3
	JobStatusCompleted  JobStatus = 4
	JobStatusErrorRetry JobStatus = 5
	JobStatusFailed     JobStatus = 6
)

func (JobStatus) String

func (j JobStatus) String() string

type Worker

type Worker[T JobArgs] interface {
	// Timeout is the maximum amount of time the job is allowed to run before
	// its context is cancelled. A timeout of zero (the default) means the job
	// will inherit the Client-level timeout. A timeout of -1 means the job's
	// context will never time out.
	Timeout(job *Job[T]) time.Duration

	// Work performs the job and returns an error if the job failed. The context
	// will be configured with a timeout according to the worker settings and may
	// be cancelled for other reasons.
	//
	// If no error is returned, the job is assumed to have succeeded and will be
	// marked completed.
	//
	// It is important for any worker to respect context cancellation to enable
	// the client to respond to shutdown requests; there is no way to cancel a
	// running job that does not respect context cancellation, other than
	// terminating the process.
	Work(ctx context.Context, job *Job[T]) error
}

Worker is an interface that can perform a job with args of type T. Workers must be registered with the client using the AddWorker function.

func WorkFunc

func WorkFunc[T JobArgs](f func(context.Context, *Job[T]) error) Worker[T]

WorkFunc wraps a function to implement the Worker interface. A job args struct implementing JobArgs will still be required to specify a Kind.

For example:

river.AddWorker(workers, river.WorkFunc(func(ctx context.Context, job *river.Job[WorkFuncArgs]) error {
	fmt.Printf("Message: %s", job.Args.Message)
	return nil
}))

type WorkerDefaults

type WorkerDefaults[T JobArgs] struct{}

WorkerDefaults is an empty struct that can be embedded in your worker struct to make it fulfill the Worker interface with default values.

func (WorkerDefaults[T]) Timeout

func (w WorkerDefaults[T]) Timeout(*Job[T]) time.Duration

Timeout returns the job-specific timeout. Override this method to set a job-specific timeout, otherwise the Client-level timeout will be applied.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL