jobs

package module
v6.0.0-beta.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 10, 2026 License: MIT Imports: 40 Imported by: 0

README

Documentation

Overview

Package jobs implements the RoadRunner jobs plugin, providing asynchronous job processing with support for multiple queue backends (memory, AMQP, Kafka, SQS, NATS, Beanstalk).

The plugin manages job pipelines, worker pools, and priority-based scheduling. Jobs are pushed via RPC from PHP applications and executed by PHP workers through the RoadRunner server. OpenTelemetry tracing is integrated for distributed trace context propagation across the PHP-Go boundary.

Key components:

  • Plugin: the main entry point, implementing the endure lifecycle
  • Pipeline: named queue configuration bound to a specific driver
  • Job/Options: domain model for jobs and their execution options
  • RPC methods: Push, PushBatch, Pause, Resume, Declare, Destroy, Stat, List

Index

Constants

View Source
const (
	// RrMode env variable
	RrMode     string = "RR_MODE"
	RrModeJobs string = "jobs"

	PluginName string = "jobs"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type CfgOptions

type CfgOptions struct {
	// Parallelism configures the number of pipelines to be started at the same time
	Parallelism int `mapstructure:"parallelism"`
}

type Config

type Config struct {
	// NumPollers configures number of priority queue pollers
	// Default - num logical cores
	NumPollers int `mapstructure:"num_pollers"`
	// Options contain additional configuration options for the job plugin
	CfgOptions *CfgOptions `mapstructure:"options"`
	// PipelineSize is the limit of a main jobs queue which consumes Items from the driver's pipeline
	// a Driver pipeline might be much larger than a main jobs queue
	PipelineSize uint64 `mapstructure:"pipeline_size"`
	// Timeout in seconds is the per-push limit to put the job into the queue
	Timeout int `mapstructure:"timeout"`
	// Pools is a map of worker pools, where the key is the name of the pool and the value is the pool configuration
	Pools map[string]*poolImpl.Config `mapstructure:"pools"`
	// Pool configures roadrunner workers pool.
	Pool *poolImpl.Config `mapstructure:"pool"`
	// Pipelines define mapping between PHP job pipeline and associated job broker.
	Pipelines map[string]Pipeline `mapstructure:"pipelines"`
	// Consuming specifies names of pipelines to be consumed on service start.
	Consume []string `mapstructure:"consume"`
}

Config defines settings for job broker, workers and job-pipeline mapping.

func (*Config) InitDefaults

func (c *Config) InitDefaults() error

func (*Config) TimeoutDuration

func (c *Config) TimeoutDuration() time.Duration

type Configurer

type Configurer interface {
	// Experimental checks if there are any experimental features enabled.
	Experimental() bool
	// UnmarshalKey takes a single key and unmarshal it into a Struct.
	UnmarshalKey(name string, out any) error
	// Has checks if config section exists.
	Has(name string) bool
}

type Informer

type Informer interface {
	Workers() []*process.State
}

type Job

type Job struct {
	// Job contains name of job broker (usually PHP class).
	Job string `json:"job"`
	// Ident is unique identifier of the job, should be provided from outside
	Ident string `json:"id"`
	// Payload is string data (usually JSON) passed to Job broker.
	Pld []byte `json:"payload"`
	// Headers with key-value pairs
	Hdr map[string][]string `json:"headers"`
	// Options contains set of PipelineOptions specific to job execution. Can be empty.
	Options *Options `json:"options,omitempty"`
}

func (*Job) AutoAck

func (j *Job) AutoAck() bool

func (*Job) Delay

func (j *Job) Delay() int64

func (*Job) GroupID

func (j *Job) GroupID() string

func (*Job) Headers

func (j *Job) Headers() map[string][]string

func (*Job) ID

func (j *Job) ID() string

func (*Job) Metadata

func (j *Job) Metadata() string

func (*Job) Name

func (j *Job) Name() string

func (*Job) Offset

func (j *Job) Offset() int64

Offset returns the Kafka offset of the job. If Options is nil, it returns 0. Kafka Options

func (*Job) Partition

func (j *Job) Partition() int32

func (*Job) Payload

func (j *Job) Payload() []byte

func (*Job) Priority

func (j *Job) Priority() int64

func (*Job) Topic

func (j *Job) Topic() string

func (*Job) UpdatePriority

func (j *Job) UpdatePriority(p int64)

type Logger

type Logger interface {
	NamedLogger(name string) *slog.Logger
}

type Options

type Options struct {
	// Priority is job priority, default - 10
	// pointer to distinguish 0 as a priority and nil as priority not set
	Priority int64 `json:"priority"`
	// Pipeline manually specified pipeline.
	Pipeline string `json:"pipeline,omitzero"`
	// Delay defines time duration to delay execution for. Defaults to none.
	Delay int64 `json:"delay,omitzero"`
	// AutoAck use to ack a job right after it arrived from the driver
	AutoAck bool `json:"auto_ack"`

	// kafka specific fields
	// Topic is kafka topic
	Topic string `json:"topic"`
	// Optional metadata
	Metadata string `json:"metadata"`
	// Kafka offset
	Offset int64 `json:"offset"`
	// Kafka partition
	Partition int32 `json:"partition"`
}

Options carry information about how to handle given job.

func (*Options) DelayDuration

func (o *Options) DelayDuration() time.Duration

DelayDuration returns delay duration in a form of time.Duration.

type Pipeline

type Pipeline map[string]any

Pipeline defines pipeline options.

func (Pipeline) Bool

func (p Pipeline) Bool(key string, d bool) bool

Bool must return option value as bool or return default value.

func (Pipeline) Driver

func (p Pipeline) Driver() string

Driver associated with the pipeline.

func (Pipeline) Get

func (p Pipeline) Get(key string) any

Get used to get the data associated with the key

func (Pipeline) Has

func (p Pipeline) Has(key string) bool

Has checks if value presented in pipeline.

func (Pipeline) Int

func (p Pipeline) Int(key string, d int) int

Int must return option value as int or return default value.

func (Pipeline) Map

func (p Pipeline) Map(key string, out map[string]string) error

Map must return nested map value or empty config. There might be sqs attributes or tags, for example

func (Pipeline) Name

func (p Pipeline) Name() string

Name returns pipeline name.

func (Pipeline) Pool

func (p Pipeline) Pool() string

func (Pipeline) Priority

func (p Pipeline) Priority() int64

Priority returns default pipeline priority

func (Pipeline) String

func (p Pipeline) String(key string, d string) string

String must return option value as string or return default value.

func (Pipeline) With

func (p Pipeline) With(name string, value any)

With pipeline value

type Plugin

type Plugin struct {
	// contains filtered or unexported fields
}

func (*Plugin) AddWorker

func (p *Plugin) AddWorker() error

AddWorker dynamically adds a worker to all configured pool(s).

func (*Plugin) Collects

func (p *Plugin) Collects() []*dep.In

Collects declares the plugin's dependencies: job driver constructors and an optional tracer provider.

func (*Plugin) Declare

func (p *Plugin) Declare(ctx context.Context, pipeline jobsApi.Pipeline) error

Declare dynamically registers a new pipeline at runtime, initializing its driver and starting consumption.

func (*Plugin) Destroy

func (p *Plugin) Destroy(ctx context.Context, pp string) error

Destroy stops the pipeline driver and removes it from the plugin's registry.

func (*Plugin) Execute

func (p *Plugin) Execute(pldCtx []byte, pool Pool, jb jobs.Job, span trace.Span, start time.Time)

Execute dispatches a job to the given worker pool, waits for the response, and handles the result according to the RoadRunner jobs protocol (ack, nack, requeue, or delay).

func (*Plugin) Init

func (p *Plugin) Init(cfg Configurer, log Logger, server Server) error

Init configures the jobs plugin from the application config, sets up the priority queue, pipeline registry, and metrics collector. Returns errors.Disabled if the plugin section is not present.

func (*Plugin) JobsState

func (p *Plugin) JobsState(ctx context.Context) ([]*jobsApi.State, error)

JobsState queries each registered pipeline driver for its current state (active, delayed, reserved job counts).

func (*Plugin) List

func (p *Plugin) List() []string

List returns the names of all registered pipelines.

func (*Plugin) MetricsCollector

func (p *Plugin) MetricsCollector() []prometheus.Collector

MetricsCollector returns Prometheus collectors for jobs processing and push metrics.

func (*Plugin) Name

func (p *Plugin) Name() string

Name returns the plugin identifier used for config lookup and dependency resolution.

func (*Plugin) Pause

func (p *Plugin) Pause(ctx context.Context, pp string) error

Pause suspends job consumption for the specified pipeline.

func (*Plugin) Push

func (p *Plugin) Push(ctx context.Context, j jobsApi.Message) error

Push sends a single job to the pipeline specified by the job's GroupID.

func (*Plugin) PushBatch

func (p *Plugin) PushBatch(ctx context.Context, j []jobsApi.Message) error

PushBatch sends multiple jobs to their respective pipelines. Each job is pushed individually and the operation stops on the first error.

func (*Plugin) RPC

func (p *Plugin) RPC() (string, http.Handler)

func (*Plugin) Ready

func (p *Plugin) Ready() (*status.Status, error)

Ready return readiness status of the particular plugin

func (*Plugin) RemoveWorker

func (p *Plugin) RemoveWorker(ctx context.Context) error

RemoveWorker removes one worker from all configured pool(s).

func (*Plugin) Reset

func (p *Plugin) Reset() error

Reset restarts all worker pool(s) by destroying existing workers and spawning fresh ones.

func (*Plugin) Resume

func (p *Plugin) Resume(ctx context.Context, pp string) error

Resume restarts job consumption for a previously paused pipeline.

func (*Plugin) Serve

func (p *Plugin) Serve() chan error

Serve initializes all configured pipelines via their driver constructors, creates worker pool(s), starts the queue pollers, and begins processing jobs. Returns an error channel for async errors.

func (*Plugin) Status

func (p *Plugin) Status() (*status.Status, error)

Status return status of the particular plugin

func (*Plugin) Stop

func (p *Plugin) Stop(ctx context.Context) error

Stop gracefully shuts down the plugin by signaling pollers to stop, waiting for in-flight jobs to complete, stopping all pipeline drivers, and destroying the worker pool(s).

func (*Plugin) Workers

func (p *Plugin) Workers() []*process.State

Workers returns the state of all worker processes across all configured pools.

type Pool

type Pool interface {
	// Workers returns worker list associated with the pool.
	Workers() (workers []*worker.Process)
	// Exec payload
	Exec(ctx context.Context, p *payload.Payload, stopCh chan struct{}) (chan *staticPool.PExec, error)
	// RemoveWorker removes worker from the pool.
	RemoveWorker(ctx context.Context) error
	// AddWorker adds worker to the pool.
	AddWorker() error
	// Reset kill all workers inside the watcher and replaces with new
	Reset(ctx context.Context) error
	// Destroy all underlying stack (but let them complete the task).
	Destroy(ctx context.Context)
}

type Server

type Server interface {
	NewPool(ctx context.Context, cfg *poolConfig.Config, env map[string]string, _ *slog.Logger) (*staticPool.Pool, error)
}

Server creates workers for the application.

type StatsExporter

type StatsExporter struct {
	TotalMemoryDesc  *prometheus.Desc
	StateDesc        *prometheus.Desc
	WorkerMemoryDesc *prometheus.Desc
	TotalWorkersDesc *prometheus.Desc

	WorkersReady   *prometheus.Desc
	WorkersWorking *prometheus.Desc
	WorkersInvalid *prometheus.Desc

	Workers Informer
}

func (*StatsExporter) Collect

func (s *StatsExporter) Collect(ch chan<- prometheus.Metric)

func (*StatsExporter) Describe

func (s *StatsExporter) Describe(d chan<- *prometheus.Desc)

type Tracer

type Tracer interface {
	Tracer() *sdktrace.TracerProvider
}

Directories

Path Synopsis
Package protocol handles the internal worker response protocol for the RoadRunner jobs plugin.
Package protocol handles the internal worker response protocol for the RoadRunner jobs plugin.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL