Documentation ¶
Index ¶
- Constants
- type Config
- type Factory
- type JobError
- type Payload
- type PipeFactory
- type Pool
- type Server
- func (s *Server) Exec(rqs *Payload) (rsp *Payload, err error)
- func (s *Server) Listen(l func(event int, ctx interface{}))
- func (s *Server) Pool() Pool
- func (s *Server) Reconfigure(cfg *ServerConfig) error
- func (s *Server) Reset() error
- func (s *Server) Start() (err error)
- func (s *Server) Stop()
- func (s *Server) Workers() (workers []*Worker)
- type ServerConfig
- type SocketFactory
- type State
- type StaticPool
- type Worker
- type WorkerError
Constants ¶
const ( // EventWorkerConstruct thrown when new worker is spawned. EventWorkerConstruct = iota + 100 // EventWorkerDestruct thrown after worker destruction. EventWorkerDestruct // EventWorkerKill thrown after worker is being forcefully killed. EventWorkerKill // EventWorkerError thrown any worker related even happen (passed with WorkerError) EventWorkerError // EventWorkerDead thrown when worker stops worker for any reason. EventWorkerDead // EventPoolError caused on pool wide errors EventPoolError )
const ( // EventPoolConstruct triggered when server creates new pool. EventServerStart = iota + 200 // EventPoolConstruct triggered when server creates new pool. EventServerStop // EventServerFailure triggered when server is unable to replace dead pool. EventServerFailure // EventPoolConstruct triggered when server creates new pool. EventPoolConstruct // EventPoolDestruct triggered when server destroys existed pool. EventPoolDestruct )
const ( // StateInactive - no associated process StateInactive int64 = iota // StateReady - ready for job. StateReady // StateWorking - working on given payload. StateWorking // StateStreaming - indicates that worker is streaming the data at the moment. StateStreaming // StateStopping - process is being softly stopped. StateStopping // StateStopped - process has been terminated. StateStopped // StateErrored - error state (can't be used). StateErrored )
const (
// StopRequest can be sent by worker to indicate that restart is required.
StopRequest = "{\"stop\":true}"
)
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct { // NumWorkers defines how many sub-processes can be run at once. This value // might be doubled by Swapper while hot-swap. NumWorkers int64 // MaxJobs defines how many executions is allowed for the worker until // it's destruction. set 1 to create new process for each new task, 0 to let // worker handle as many tasks as it can. MaxJobs int64 // AllocateTimeout defines for how long pool will be waiting for a worker to // be freed to handle the task. AllocateTimeout time.Duration //todo: to milleseconds? // DestroyTimeout defines for how long pool should be waiting for worker to // properly stop, if timeout reached worker will be killed. DestroyTimeout time.Duration //todo: to milleseconds? }
Config defines basic behaviour of worker creation and handling process.
type Factory ¶
type Factory interface { // SpawnWorker creates new worker process based on given command. // Process must not be started. SpawnWorker(cmd *exec.Cmd) (w *Worker, err error) // Close the factory and underlying connections. Close() error }
Factory is responsible of wrapping given command into tasks worker.
type JobError ¶
type JobError []byte
JobError is job level error (no worker halt), wraps at top of error context
type Payload ¶
type Payload struct { // Context represent payload context, might be omitted. Context []byte // body contains binary payload to be processed by worker. Body []byte }
Payload carries binary header and body to workers and back to the server.
type PipeFactory ¶
type PipeFactory struct { }
PipeFactory connects to workers using standard streams (STDIN, STDOUT pipes).
func NewPipeFactory ¶
func NewPipeFactory() *PipeFactory
NewPipeFactory returns new factory instance and starts listening
func (*PipeFactory) SpawnWorker ¶
func (f *PipeFactory) SpawnWorker(cmd *exec.Cmd) (w *Worker, err error)
SpawnWorker creates new worker and connects it to goridge relay, method Wait() must be handled on level above.
type Pool ¶
type Pool interface { // AddListener all caused events to attached watcher. Listen(l func(event int, ctx interface{})) // Exec one task with given payload and context, returns result or error. Exec(rqs *Payload) (rsp *Payload, err error) // Workers returns worker list associated with the pool. Workers() (workers []*Worker) // Destroy all underlying workers (but let them to complete the task). Destroy() }
Pool managed set of inner worker processes.
type Server ¶ added in v1.0.0
type Server struct {
// contains filtered or unexported fields
}
Service manages pool creation and swapping.
func NewServer ¶ added in v1.0.0
func NewServer(cfg *ServerConfig) *Server
NewServer creates new router. Make sure to call configure before the usage.
func (*Server) Exec ¶ added in v1.0.0
Exec one task with given payload and context, returns result or error.
func (*Server) Reconfigure ¶ added in v1.0.0
func (s *Server) Reconfigure(cfg *ServerConfig) error
Reconfigure re-configures underlying pool and destroys it's previous version if any. Reconfigure will ignore factory and relay settings.
func (*Server) Reset ¶ added in v1.0.0
Reset resets the state of underlying pool and rebuilds all of it's workers.
func (*Server) Start ¶ added in v1.0.0
Start underlying worker pool, configure factory and command provider.
type ServerConfig ¶ added in v1.0.0
type ServerConfig struct { // Command includes command strings with all the parameters, example: "php worker.php pipes". Command string // Relay defines connection method and factory to be used to connect to workers: // "pipes", "tcp://:6001", "unix://rr.sock" // This config section must not change on re-configuration. Relay string // RelayTimeout defines for how long socket factory will be waiting for worker connection. This config section // must not change on re-configuration. RelayTimeout time.Duration // Pool defines worker pool configuration, number of workers, timeouts and etc. This config section might change // while server is running. Pool *Config }
Server config combines factory, pool and cmd configurations.
func (*ServerConfig) Differs ¶ added in v1.0.0
func (cfg *ServerConfig) Differs(new *ServerConfig) bool
Differs returns true if configuration has changed but ignores pool or cmd changes.
type SocketFactory ¶
type SocketFactory struct {
// contains filtered or unexported fields
}
SocketFactory connects to external workers using socket server.
func NewSocketFactory ¶
func NewSocketFactory(ls net.Listener, tout time.Duration) *SocketFactory
NewSocketFactory returns SocketFactory attached to a given socket lsn. tout specifies for how long factory should serve for incoming relay connection
func (*SocketFactory) Close ¶ added in v1.0.0
func (f *SocketFactory) Close() error
Close socket factory and underlying socket connection.
func (*SocketFactory) SpawnWorker ¶
func (f *SocketFactory) SpawnWorker(cmd *exec.Cmd) (w *Worker, err error)
SpawnWorker creates worker and connects it to appropriate relay or returns error
type State ¶
type State interface { fmt.Stringer // Value returns state value Value() int64 // NumJobs shows how many times worker was invoked NumExecs() int64 }
State represents worker status and updated time.
type StaticPool ¶ added in v1.0.0
type StaticPool struct {
// contains filtered or unexported fields
}
StaticPool controls worker creation, destruction and task routing. Pool uses fixed amount of workers.
func NewPool ¶
NewPool creates new worker pool and task multiplexer. StaticPool will initiate with one worker.
func (*StaticPool) Config ¶ added in v1.0.0
func (p *StaticPool) Config() Config
Config returns associated pool configuration. Immutable.
func (*StaticPool) Destroy ¶ added in v1.0.0
func (p *StaticPool) Destroy()
Destroy all underlying workers (but let them to complete the task).
func (*StaticPool) Exec ¶ added in v1.0.0
func (p *StaticPool) Exec(rqs *Payload) (rsp *Payload, err error)
Exec one task with given payload and context, returns result or error.
func (*StaticPool) Listen ¶ added in v1.0.0
func (p *StaticPool) Listen(l func(event int, ctx interface{}))
AddListener attaches pool event watcher.
func (*StaticPool) Workers ¶ added in v1.0.0
func (p *StaticPool) Workers() (workers []*Worker)
Workers returns worker list associated with the pool.
type Worker ¶
type Worker struct { // Pid of the process, points to Pid of underlying process and // can be nil while process is not started. Pid *int // Created indicates at what time worker has been created. Created time.Time // contains filtered or unexported fields }
Worker - supervised process with api over goridge.Relay.
func (*Worker) Exec ¶
Exec sends payload to worker, executes it and returns result or error. Make sure to handle worker.Wait() to gather worker level errors. Method might return JobError indicating issue with payload.
func (*Worker) Kill ¶
Kill kills underlying process, make sure to call Wait() func to gather error log from the stderr. Does not waits for process completion!
func (*Worker) State ¶
State return receive-only worker state object, state can be used to safely access worker status, time when status changed and number of worker executions.
func (*Worker) Stop ¶
Stop sends soft termination command to the worker and waits for process completion.
type WorkerError ¶ added in v1.0.0
WorkerError is worker related error
func (WorkerError) Error ¶ added in v1.0.0
func (e WorkerError) Error() string
Error converts error context to string