roadrunner

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 13, 2018 License: MIT Imports: 16 Imported by: 0

README

RoadRunner

Latest Stable Version GoDoc Build Status Go Report Card Scrutinizer Code Quality Codecov

High-performance PSR-7 HTTP server, PHP load balancer and process manager.

Features:

  • PSR-7 HTTP server (file uploads, error handling, static files, hot reload, middlewares, event listeners)
  • extendable service model
  • no external services, drop-in (based on Goridge)
  • load balancer, process manager and task pipeline
  • frontend agnostic (queue, REST, PSR-7, async php, etc)
  • works over TCP, unix sockets and standard pipes
  • automatic worker replacement and safe PHP process destruction
  • worker lifecycle management (create/allocate/destroy timeouts)
  • payload context and body
  • control over max jobs per worker
  • protocol, worker and job level error management (including PHP errors)
  • memory leak failswitch
  • very fast (~250k calls per second on Ryzen 1700X over 16 threads)
  • works on Windows

Installation:

$ go get github.com/spiral/roadrunner
$ composer require spiral/roadrunner

Usage:

$ cd cmd
$ cd rr
$ go build && go install
$ cp .rr.yaml path/to/the/project

TODO: To be updated with build scripts!

$ rr serve -v

Example worker.

Examples:

p, err := rr.NewPool(
    func() *exec.Cmd { return exec.Command("php", "worker.php", "pipes") },
    rr.NewPipeFactory(),
    rr.Config{
        NumWorkers:      uint64(runtime.NumCPU()),
        AllocateTimeout: time.Second,              
        DestroyTimeout:  time.Second,               
    },
)
defer p.Destroy()

rsp, err := p.Exec(&rr.Payload{Body: []byte("hello")})
<?php
/**
 * @var Goridge\RelayInterface $relay
 */

use Spiral\Goridge;
use Spiral\RoadRunner;

$rr = new RoadRunner\Worker($relay);

while ($body = $rr->receive($context)) {
    try {
        $rr->send((string)$body, (string)$context);
    } catch (\Throwable $e) {
        $rr->error((string)$e);
    }
}

Check how to init relay here. More examples can be found in tests.

Testing:

$ make test

License:

The MIT License (MIT). Please see LICENSE for more information.

Documentation

Index

Constants

View Source
const (
	// EventWorkerConstruct thrown when new worker is spawned.
	EventWorkerConstruct = iota + 100

	// EventWorkerDestruct thrown after worker destruction.
	EventWorkerDestruct

	// EventWorkerKill thrown after worker is being forcefully killed.
	EventWorkerKill

	// EventWorkerError thrown any worker related even happen (passed with WorkerError)
	EventWorkerError

	// EventWorkerDead thrown when worker stops worker for any reason.
	EventWorkerDead

	// EventPoolError caused on pool wide errors
	EventPoolError
)
View Source
const (
	// EventPoolConstruct triggered when server creates new pool.
	EventServerStart = iota + 200

	// EventPoolConstruct triggered when server creates new pool.
	EventServerStop

	// EventServerFailure triggered when server is unable to replace dead pool.
	EventServerFailure

	// EventPoolConstruct triggered when server creates new pool.
	EventPoolConstruct

	// EventPoolDestruct triggered when server destroys existed pool.
	EventPoolDestruct
)
View Source
const (
	// StateInactive - no associated process
	StateInactive int64 = iota

	// StateReady - ready for job.
	StateReady

	// StateWorking - working on given payload.
	StateWorking

	// StateStreaming - indicates that worker is streaming the data at the moment.
	StateStreaming

	// StateStopping - process is being softly stopped.
	StateStopping

	// StateStopped - process has been terminated.
	StateStopped

	// StateErrored - error state (can't be used).
	StateErrored
)
View Source
const (
	// StopRequest can be sent by worker to indicate that restart is required.
	StopRequest = "{\"stop\":true}"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	// NumWorkers defines how many sub-processes can be run at once. This value
	// might be doubled by Swapper while hot-swap.
	NumWorkers int64

	// MaxJobs defines how many executions is allowed for the worker until
	// it's destruction. set 1 to create new process for each new task, 0 to let
	// worker handle as many tasks as it can.
	MaxJobs int64

	// AllocateTimeout defines for how long pool will be waiting for a worker to
	// be freed to handle the task.
	AllocateTimeout time.Duration //todo: to milleseconds?

	// DestroyTimeout defines for how long pool should be waiting for worker to
	// properly stop, if timeout reached worker will be killed.
	DestroyTimeout time.Duration //todo: to milleseconds?
}

Config defines basic behaviour of worker creation and handling process.

func (*Config) Valid

func (cfg *Config) Valid() error

Reconfigure returns error if cfg not valid

type Factory

type Factory interface {
	// SpawnWorker creates new worker process based on given command.
	// Process must not be started.
	SpawnWorker(cmd *exec.Cmd) (w *Worker, err error)

	// Close the factory and underlying connections.
	Close() error
}

Factory is responsible of wrapping given command into tasks worker.

type JobError

type JobError []byte

JobError is job level error (no worker halt), wraps at top of error context

func (JobError) Error

func (je JobError) Error() string

Error converts error context to string

type Payload

type Payload struct {
	// Context represent payload context, might be omitted.
	Context []byte

	// body contains binary payload to be processed by worker.
	Body []byte
}

Payload carries binary header and body to workers and back to the server.

func (*Payload) String

func (p *Payload) String() string

String returns payload body as string

type PipeFactory

type PipeFactory struct {
}

PipeFactory connects to workers using standard streams (STDIN, STDOUT pipes).

func NewPipeFactory

func NewPipeFactory() *PipeFactory

NewPipeFactory returns new factory instance and starts listening

func (*PipeFactory) Close added in v1.0.0

func (f *PipeFactory) Close() error

Close the factory.

func (*PipeFactory) SpawnWorker

func (f *PipeFactory) SpawnWorker(cmd *exec.Cmd) (w *Worker, err error)

SpawnWorker creates new worker and connects it to goridge relay, method Wait() must be handled on level above.

type Pool

type Pool interface {
	// AddListener all caused events to attached watcher.
	Listen(l func(event int, ctx interface{}))

	// Exec one task with given payload and context, returns result or error.
	Exec(rqs *Payload) (rsp *Payload, err error)

	// Workers returns worker list associated with the pool.
	Workers() (workers []*Worker)

	// Destroy all underlying workers (but let them to complete the task).
	Destroy()
}

Pool managed set of inner worker processes.

type Server added in v1.0.0

type Server struct {
	// contains filtered or unexported fields
}

Service manages pool creation and swapping.

func NewServer added in v1.0.0

func NewServer(cfg *ServerConfig) *Server

NewServer creates new router. Make sure to call configure before the usage.

func (*Server) Exec added in v1.0.0

func (s *Server) Exec(rqs *Payload) (rsp *Payload, err error)

Exec one task with given payload and context, returns result or error.

func (*Server) Listen added in v1.0.0

func (s *Server) Listen(l func(event int, ctx interface{}))

AddListener attaches server event watcher.

func (*Server) Pool added in v1.0.0

func (s *Server) Pool() Pool

Pool returns active pool or error.

func (*Server) Reconfigure added in v1.0.0

func (s *Server) Reconfigure(cfg *ServerConfig) error

Reconfigure re-configures underlying pool and destroys it's previous version if any. Reconfigure will ignore factory and relay settings.

func (*Server) Reset added in v1.0.0

func (s *Server) Reset() error

Reset resets the state of underlying pool and rebuilds all of it's workers.

func (*Server) Start added in v1.0.0

func (s *Server) Start() (err error)

Start underlying worker pool, configure factory and command provider.

func (*Server) Stop added in v1.0.0

func (s *Server) Stop()

Stop underlying worker pool and close the factory.

func (*Server) Workers added in v1.0.0

func (s *Server) Workers() (workers []*Worker)

Workers returns worker list associated with the server pool.

type ServerConfig added in v1.0.0

type ServerConfig struct {
	// Command includes command strings with all the parameters, example: "php worker.php pipes".
	Command string

	// Relay defines connection method and factory to be used to connect to workers:
	// "pipes", "tcp://:6001", "unix://rr.sock"
	// This config section must not change on re-configuration.
	Relay string

	// RelayTimeout defines for how long socket factory will be waiting for worker connection. This config section
	// must not change on re-configuration.
	RelayTimeout time.Duration

	// Pool defines worker pool configuration, number of workers, timeouts and etc. This config section might change
	// while server is running.
	Pool *Config
}

Server config combines factory, pool and cmd configurations.

func (*ServerConfig) Differs added in v1.0.0

func (cfg *ServerConfig) Differs(new *ServerConfig) bool

Differs returns true if configuration has changed but ignores pool or cmd changes.

type SocketFactory

type SocketFactory struct {
	// contains filtered or unexported fields
}

SocketFactory connects to external workers using socket server.

func NewSocketFactory

func NewSocketFactory(ls net.Listener, tout time.Duration) *SocketFactory

NewSocketFactory returns SocketFactory attached to a given socket lsn. tout specifies for how long factory should serve for incoming relay connection

func (*SocketFactory) Close added in v1.0.0

func (f *SocketFactory) Close() error

Close socket factory and underlying socket connection.

func (*SocketFactory) SpawnWorker

func (f *SocketFactory) SpawnWorker(cmd *exec.Cmd) (w *Worker, err error)

SpawnWorker creates worker and connects it to appropriate relay or returns error

type State

type State interface {
	fmt.Stringer

	// Value returns state value
	Value() int64

	// NumJobs shows how many times worker was invoked
	NumExecs() int64
}

State represents worker status and updated time.

type StaticPool added in v1.0.0

type StaticPool struct {
	// contains filtered or unexported fields
}

StaticPool controls worker creation, destruction and task routing. Pool uses fixed amount of workers.

func NewPool

func NewPool(cmd func() *exec.Cmd, factory Factory, cfg Config) (*StaticPool, error)

NewPool creates new worker pool and task multiplexer. StaticPool will initiate with one worker.

func (*StaticPool) Config added in v1.0.0

func (p *StaticPool) Config() Config

Config returns associated pool configuration. Immutable.

func (*StaticPool) Destroy added in v1.0.0

func (p *StaticPool) Destroy()

Destroy all underlying workers (but let them to complete the task).

func (*StaticPool) Exec added in v1.0.0

func (p *StaticPool) Exec(rqs *Payload) (rsp *Payload, err error)

Exec one task with given payload and context, returns result or error.

func (*StaticPool) Listen added in v1.0.0

func (p *StaticPool) Listen(l func(event int, ctx interface{}))

AddListener attaches pool event watcher.

func (*StaticPool) Workers added in v1.0.0

func (p *StaticPool) Workers() (workers []*Worker)

Workers returns worker list associated with the pool.

type Worker

type Worker struct {
	// Pid of the process, points to Pid of underlying process and
	// can be nil while process is not started.
	Pid *int

	// Created indicates at what time worker has been created.
	Created time.Time
	// contains filtered or unexported fields
}

Worker - supervised process with api over goridge.Relay.

func (*Worker) Exec

func (w *Worker) Exec(rqs *Payload) (rsp *Payload, err error)

Exec sends payload to worker, executes it and returns result or error. Make sure to handle worker.Wait() to gather worker level errors. Method might return JobError indicating issue with payload.

func (*Worker) Kill

func (w *Worker) Kill() error

Kill kills underlying process, make sure to call Wait() func to gather error log from the stderr. Does not waits for process completion!

func (*Worker) State

func (w *Worker) State() State

State return receive-only worker state object, state can be used to safely access worker status, time when status changed and number of worker executions.

func (*Worker) Stop

func (w *Worker) Stop() error

Stop sends soft termination command to the worker and waits for process completion.

func (*Worker) String

func (w *Worker) String() string

String returns worker description.

func (*Worker) Wait

func (w *Worker) Wait() error

Wait must be called once for each worker, call will be released once worker is complete and will return process error (if any), if stderr is presented it's value will be wrapped as WorkerError. Method will return error code if php process fails to find or start the script.

type WorkerError added in v1.0.0

type WorkerError struct {
	// Worker
	Worker *Worker

	// Caused error
	Caused error
}

WorkerError is worker related error

func (WorkerError) Error added in v1.0.0

func (e WorkerError) Error() string

Error converts error context to string

Directories

Path Synopsis
cmd
rr
rpc

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL