roadrunner

package module
v0.9.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 28, 2018 License: MIT Imports: 15 Imported by: 0

README

RoadRunner

GoDoc Build Status Go Report Card Scrutinizer Code Quality Codecov

High-performance PHP job balancer and process manager library for Golang.

Features:

  • no external dependencies or services, drop-in
  • load balancer, process manager and task pipeline
  • build for multiple frontends (queue, rest, psr-7, async php, etc)
  • works over TPC, unix sockets and standard pipes
  • automatic worker replacement and safe destruction
  • worker lifecycle management (create/allocate/destroy timeouts)
  • payload context and body
  • control over max jobs per worker
  • protocol, worker and job level error management (including PHP errors)
  • very fast (~250k calls per second on Ryzen 1700X over 16 threads)
  • works on Windows

Examples:

p, err := NewPool(
    func() *exec.Cmd { return exec.Command("php", "worker.php", "pipes") },
    NewPipeFactory(),
    Config{
        NumWorkers:      uint64(runtime.NumCPU()),
        AllocateTimeout: time.Second,              
        DestroyTimeout:  time.Second,               
    },
)
defer p.Destroy()

rsp, err := p.Exec(&Payload{Body: []byte("hello")})
<?php
/**
 * @var Goridge\RelayInterface $relay
 */

use Spiral\Goridge;
use Spiral\RoadRunner;

$rr = new RoadRunner\Worker($relay);

while ($body = $rr->receive($context)) {
    try {
        $rr->send((string)$body, (string)$context);
    } catch (\Throwable $e) {
        $rr->error((string)$e);
    }
}

Check how to init relay here.

License:

The MIT License (MIT). Please see LICENSE for more information.

Documentation

Index

Constants

View Source
const (
	// EventCreated thrown when new worker is spawned.
	EventCreated = iota

	// EventDestruct thrown before worker destruction.
	EventDestruct

	// EventError thrown any worker related even happen (error passed as context)
	EventError
)
View Source
const (
	// StateInactive - no associated process
	StateInactive int64 = iota
	// StateReady - ready for job.
	StateReady
	// StateWorking - working on given payload.
	StateWorking
	// StateStopped - process has been terminated
	StateStopped
	// StateErrored - error state (can't be used)
	StateErrored
)
View Source
const (
	// StopRequest can be sent by worker to indicate that restart is required.
	StopRequest = "{\"stop\":true}"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	// NumWorkers defines how many sub-processes can be run at once. This value
	// might be doubled by Swapper while hot-swap.
	NumWorkers uint64

	// MaxExecutions defines how many executions is allowed for the worker until
	// it's destruction. set 1 to create new process for each new task, 0 to let
	// worker handle as many tasks as it can.
	MaxExecutions uint64

	// AllocateTimeout defines for how long pool will be waiting for a worker to
	// be freed to handle the task.
	AllocateTimeout time.Duration

	// DestroyTimeout defines for how long pool should be waiting for worker to
	// properly stop, if timeout reached worker will be killed.
	DestroyTimeout time.Duration
}

Config defines basic behaviour of worker creation and handling process.

func (*Config) Valid

func (cfg *Config) Valid() error

Valid returns error if config not valid

type Factory

type Factory interface {
	// SpawnWorker creates new worker process based on given command.
	// Process must not be started.
	SpawnWorker(cmd *exec.Cmd) (w *Worker, err error)
}

Factory is responsible of wrapping given command into tasks worker.

type JobError

type JobError []byte

JobError is job level error (no worker halt), wraps at top of error context

func (JobError) Error

func (je JobError) Error() string

Error converts error context to string

type Payload

type Payload struct {
	// Context represent payload context, might be omitted
	Context []byte

	// Body contains binary payload to be processed by worker
	Body []byte
}

Payload carries binary header and body to workers and back to the server.

func (*Payload) String

func (p *Payload) String() string

String returns payload body as string

type PipeFactory

type PipeFactory struct {
}

PipeFactory connects to workers using standard streams (STDIN, STDOUT pipes).

func NewPipeFactory

func NewPipeFactory() *PipeFactory

NewPipeFactory returns new factory instance and starts listening

func (*PipeFactory) SpawnWorker

func (f *PipeFactory) SpawnWorker(cmd *exec.Cmd) (w *Worker, err error)

SpawnWorker creates new worker and connects it to goridge relay, method Wait() must be handled on level above.

type Pool

type Pool struct {
	// Observer is optional callback to handle worker create/destruct/error events.
	Observer func(event int, w *Worker, ctx interface{})
	// contains filtered or unexported fields
}

Pool controls worker creation, destruction and task routing.

func NewPool

func NewPool(cmd func() *exec.Cmd, factory Factory, cfg Config) (*Pool, error)

NewPool creates new worker pool and task multiplexer. Pool will initiate with one worker.

func (*Pool) Config

func (p *Pool) Config() Config

Config returns associated pool configuration. Immutable.

func (*Pool) Destroy

func (p *Pool) Destroy()

Destroy all underlying workers (but let them to complete the task).

func (*Pool) Exec

func (p *Pool) Exec(rqs *Payload) (rsp *Payload, err error)

Exec one task with given payload and context, returns result or error.

func (*Pool) Workers

func (p *Pool) Workers() (workers []*Worker)

Workers returns worker list associated with the pool.

type SocketFactory

type SocketFactory struct {
	// contains filtered or unexported fields
}

SocketFactory connects to external workers using socket server.

func NewSocketFactory

func NewSocketFactory(ls net.Listener, tout time.Duration) *SocketFactory

NewSocketFactory returns SocketFactory attached to a given socket listener. tout specifies for how long factory should serve for incoming relay connection

func (*SocketFactory) SpawnWorker

func (f *SocketFactory) SpawnWorker(cmd *exec.Cmd) (w *Worker, err error)

SpawnWorker creates worker and connects it to appropriate relay or returns error

type State

type State interface {
	// Value returns state value
	Value() int64

	// NumExecs shows how many times worker was invoked
	NumExecs() uint64

	// Updated indicates a moment updated last state change
	Updated() time.Time
}

State represents worker status and updated time.

type Worker

type Worker struct {
	// Pid of the process, points to Pid of underlying process and
	// can be nil while process is not started.
	Pid *int
	// contains filtered or unexported fields
}

Worker - supervised process with api over goridge.Relay.

func (*Worker) Exec

func (w *Worker) Exec(rqs *Payload) (rsp *Payload, err error)

Exec sends payload to worker, executes it and returns result or error. Make sure to handle worker.Wait() to gather worker level errors. Method might return JobError indicating issue with payload.

func (*Worker) Kill

func (w *Worker) Kill() error

Kill kills underlying process, make sure to call Wait() func to gather error log from the stderr. Waits for process completion.

func (*Worker) State

func (w *Worker) State() State

State return receive-only worker state object, state can be used to safely access worker status, time when status changed and number of worker executions.

func (*Worker) Stop

func (w *Worker) Stop() error

Stop sends soft termination command to the worker and waits for process completion.

func (*Worker) String

func (w *Worker) String() string

String returns worker description.

func (*Worker) Wait

func (w *Worker) Wait() error

Wait must be called once for each worker, call will be released once worker is complete and will return process error (if any), if stderr is presented it's value will be wrapped as WorkerError. Method will return error code if php process fails to find or start the script.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL