Documentation
¶
Overview ¶
Package jobs implements the RoadRunner jobs plugin, providing asynchronous job processing with support for multiple queue backends (memory, AMQP, Kafka, SQS, NATS, Beanstalk).
The plugin manages job pipelines, worker pools, and priority-based scheduling. Jobs are pushed via RPC from PHP applications and executed by PHP workers through the RoadRunner server. OpenTelemetry tracing is integrated for distributed trace context propagation across the PHP-Go boundary.
Key components:
- Plugin: the main entry point, implementing the endure lifecycle
- Pipeline: named queue configuration bound to a specific driver
- Job/Options: domain model for jobs and their execution options
- RPC methods: Push, PushBatch, Pause, Resume, Declare, Destroy, Stat, List
Index ¶
- Constants
- type CfgOptions
- type Config
- type Configurer
- type Informer
- type Job
- func (j *Job) AutoAck() bool
- func (j *Job) Delay() int64
- func (j *Job) GroupID() string
- func (j *Job) Headers() map[string][]string
- func (j *Job) ID() string
- func (j *Job) Metadata() string
- func (j *Job) Name() string
- func (j *Job) Offset() int64
- func (j *Job) Partition() int32
- func (j *Job) Payload() []byte
- func (j *Job) Priority() int64
- func (j *Job) Topic() string
- func (j *Job) UpdatePriority(p int64)
- type Logger
- type Options
- type Pipeline
- func (p Pipeline) Bool(key string, d bool) bool
- func (p Pipeline) Driver() string
- func (p Pipeline) Get(key string) any
- func (p Pipeline) Has(key string) bool
- func (p Pipeline) Int(key string, d int) int
- func (p Pipeline) Map(key string, out map[string]string) error
- func (p Pipeline) Name() string
- func (p Pipeline) Pool() string
- func (p Pipeline) Priority() int64
- func (p Pipeline) String(key string, d string) string
- func (p Pipeline) With(name string, value any)
- type Plugin
- func (p *Plugin) AddWorker() error
- func (p *Plugin) Collects() []*dep.In
- func (p *Plugin) Declare(ctx context.Context, pipeline jobsApi.Pipeline) error
- func (p *Plugin) Destroy(ctx context.Context, pp string) error
- func (p *Plugin) Execute(pldCtx []byte, pool Pool, jb jobs.Job, span trace.Span, start time.Time)
- func (p *Plugin) Init(cfg Configurer, log Logger, server Server) error
- func (p *Plugin) JobsState(ctx context.Context) ([]*jobsApi.State, error)
- func (p *Plugin) List() []string
- func (p *Plugin) MetricsCollector() []prometheus.Collector
- func (p *Plugin) Name() string
- func (p *Plugin) Pause(ctx context.Context, pp string) error
- func (p *Plugin) Push(ctx context.Context, j jobsApi.Message) error
- func (p *Plugin) PushBatch(ctx context.Context, j []jobsApi.Message) error
- func (p *Plugin) RPC() (string, http.Handler)
- func (p *Plugin) Ready() (*status.Status, error)
- func (p *Plugin) RemoveWorker(ctx context.Context) error
- func (p *Plugin) Reset() error
- func (p *Plugin) Resume(ctx context.Context, pp string) error
- func (p *Plugin) Serve() chan error
- func (p *Plugin) Status() (*status.Status, error)
- func (p *Plugin) Stop(ctx context.Context) error
- func (p *Plugin) Workers() []*process.State
- type Pool
- type Server
- type StatsExporter
- type Tracer
Constants ¶
const ( // RrMode env variable RrMode string = "RR_MODE" RrModeJobs string = "jobs" PluginName string = "jobs" )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type CfgOptions ¶
type CfgOptions struct {
// Parallelism configures the number of pipelines to be started at the same time
Parallelism int `mapstructure:"parallelism"`
}
type Config ¶
type Config struct {
// NumPollers configures number of priority queue pollers
// Default - num logical cores
NumPollers int `mapstructure:"num_pollers"`
// Options contain additional configuration options for the job plugin
CfgOptions *CfgOptions `mapstructure:"options"`
// PipelineSize is the limit of a main jobs queue which consumes Items from the driver's pipeline
// a Driver pipeline might be much larger than a main jobs queue
PipelineSize uint64 `mapstructure:"pipeline_size"`
// Timeout in seconds is the per-push limit to put the job into the queue
Timeout int `mapstructure:"timeout"`
// Pools is a map of worker pools, where the key is the name of the pool and the value is the pool configuration
Pools map[string]*poolImpl.Config `mapstructure:"pools"`
// Pool configures roadrunner workers pool.
Pool *poolImpl.Config `mapstructure:"pool"`
// Pipelines define mapping between PHP job pipeline and associated job broker.
Pipelines map[string]Pipeline `mapstructure:"pipelines"`
// Consuming specifies names of pipelines to be consumed on service start.
Consume []string `mapstructure:"consume"`
}
Config defines settings for job broker, workers and job-pipeline mapping.
func (*Config) InitDefaults ¶
func (*Config) TimeoutDuration ¶
type Configurer ¶
type Job ¶
type Job struct {
// Job contains name of job broker (usually PHP class).
Job string `json:"job"`
// Ident is unique identifier of the job, should be provided from outside
Ident string `json:"id"`
// Payload is string data (usually JSON) passed to Job broker.
Pld []byte `json:"payload"`
// Headers with key-value pairs
Hdr map[string][]string `json:"headers"`
// Options contains set of PipelineOptions specific to job execution. Can be empty.
Options *Options `json:"options,omitempty"`
}
func (*Job) Offset ¶
Offset returns the Kafka offset of the job. If Options is nil, it returns 0. Kafka Options
func (*Job) UpdatePriority ¶
type Options ¶
type Options struct {
// Priority is job priority, default - 10
// pointer to distinguish 0 as a priority and nil as priority not set
Priority int64 `json:"priority"`
// Pipeline manually specified pipeline.
Pipeline string `json:"pipeline,omitzero"`
// Delay defines time duration to delay execution for. Defaults to none.
Delay int64 `json:"delay,omitzero"`
// AutoAck use to ack a job right after it arrived from the driver
AutoAck bool `json:"auto_ack"`
// kafka specific fields
// Topic is kafka topic
Topic string `json:"topic"`
// Optional metadata
Metadata string `json:"metadata"`
// Kafka offset
Offset int64 `json:"offset"`
// Kafka partition
Partition int32 `json:"partition"`
}
Options carry information about how to handle given job.
func (*Options) DelayDuration ¶
DelayDuration returns delay duration in a form of time.Duration.
type Pipeline ¶
Pipeline defines pipeline options.
func (Pipeline) Map ¶
Map must return nested map value or empty config. There might be sqs attributes or tags, for example
type Plugin ¶
type Plugin struct {
// contains filtered or unexported fields
}
func (*Plugin) Collects ¶
Collects declares the plugin's dependencies: job driver constructors and an optional tracer provider.
func (*Plugin) Declare ¶
Declare dynamically registers a new pipeline at runtime, initializing its driver and starting consumption.
func (*Plugin) Destroy ¶
Destroy stops the pipeline driver and removes it from the plugin's registry.
func (*Plugin) Execute ¶
Execute dispatches a job to the given worker pool, waits for the response, and handles the result according to the RoadRunner jobs protocol (ack, nack, requeue, or delay).
func (*Plugin) Init ¶
func (p *Plugin) Init(cfg Configurer, log Logger, server Server) error
Init configures the jobs plugin from the application config, sets up the priority queue, pipeline registry, and metrics collector. Returns errors.Disabled if the plugin section is not present.
func (*Plugin) JobsState ¶
JobsState queries each registered pipeline driver for its current state (active, delayed, reserved job counts).
func (*Plugin) MetricsCollector ¶
func (p *Plugin) MetricsCollector() []prometheus.Collector
MetricsCollector returns Prometheus collectors for jobs processing and push metrics.
func (*Plugin) Name ¶
Name returns the plugin identifier used for config lookup and dependency resolution.
func (*Plugin) PushBatch ¶
PushBatch sends multiple jobs to their respective pipelines. Each job is pushed individually and the operation stops on the first error.
func (*Plugin) RemoveWorker ¶
RemoveWorker removes one worker from all configured pool(s).
func (*Plugin) Reset ¶
Reset restarts all worker pool(s) by destroying existing workers and spawning fresh ones.
func (*Plugin) Serve ¶
Serve initializes all configured pipelines via their driver constructors, creates worker pool(s), starts the queue pollers, and begins processing jobs. Returns an error channel for async errors.
type Pool ¶
type Pool interface {
// Workers returns worker list associated with the pool.
Workers() (workers []*worker.Process)
// Exec payload
Exec(ctx context.Context, p *payload.Payload, stopCh chan struct{}) (chan *staticPool.PExec, error)
// RemoveWorker removes worker from the pool.
RemoveWorker(ctx context.Context) error
// AddWorker adds worker to the pool.
AddWorker() error
// Reset kill all workers inside the watcher and replaces with new
Reset(ctx context.Context) error
// Destroy all underlying stack (but let them complete the task).
Destroy(ctx context.Context)
}
type Server ¶
type Server interface {
NewPool(ctx context.Context, cfg *poolConfig.Config, env map[string]string, _ *slog.Logger) (*staticPool.Pool, error)
}
Server creates workers for the application.
type StatsExporter ¶
type StatsExporter struct {
TotalMemoryDesc *prometheus.Desc
StateDesc *prometheus.Desc
WorkerMemoryDesc *prometheus.Desc
TotalWorkersDesc *prometheus.Desc
WorkersReady *prometheus.Desc
WorkersWorking *prometheus.Desc
WorkersInvalid *prometheus.Desc
Workers Informer
}
func (*StatsExporter) Collect ¶
func (s *StatsExporter) Collect(ch chan<- prometheus.Metric)
func (*StatsExporter) Describe ¶
func (s *StatsExporter) Describe(d chan<- *prometheus.Desc)
type Tracer ¶
type Tracer interface {
Tracer() *sdktrace.TracerProvider
}