Documentation ¶
Index ¶
Constants ¶
const ( // EventCreated thrown when new worker is spawned. EventCreated = iota // EventDestruct thrown before worker destruction. EventDestruct // EventError thrown any worker related even happen (error passed as context) EventError )
const ( // StateInactive - no associated process StateInactive int64 = iota // StateReady - ready for job. StateReady // StateWorking - working on given payload. StateWorking // StateStopped - process has been terminated StateStopped // StateErrored - error state (can't be used) StateErrored )
const (
// StopRequest can be sent by worker to indicate that restart is required.
StopRequest = "{\"stop\":true}"
)
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct { // NumWorkers defines how many sub-processes can be run at once. This value // might be doubled by Swapper while hot-swap. NumWorkers uint64 // MaxExecutions defines how many executions is allowed for the worker until // it's destruction. set 1 to create new process for each new task, 0 to let // worker handle as many tasks as it can. MaxExecutions uint64 // AllocateTimeout defines for how long pool will be waiting for a worker to // be freed to handle the task. AllocateTimeout time.Duration // DestroyTimeout defines for how long pool should be waiting for worker to // properly stop, if timeout reached worker will be killed. DestroyTimeout time.Duration }
Config defines basic behaviour of worker creation and handling process.
type Factory ¶
type Factory interface { // SpawnWorker creates new worker process based on given command. // Process must not be started. SpawnWorker(cmd *exec.Cmd) (w *Worker, err error) }
Factory is responsible of wrapping given command into tasks worker.
type JobError ¶
type JobError []byte
JobError is job level error (no worker halt), wraps at top of error context
type Payload ¶
type Payload struct { // Context represent payload context, might be omitted Context []byte // Body contains binary payload to be processed by worker Body []byte }
Payload carries binary header and body to workers and back to the server.
type PipeFactory ¶
type PipeFactory struct { }
PipeFactory connects to workers using standard streams (STDIN, STDOUT pipes).
func NewPipeFactory ¶
func NewPipeFactory() *PipeFactory
NewPipeFactory returns new factory instance and starts listening
func (*PipeFactory) SpawnWorker ¶
func (f *PipeFactory) SpawnWorker(cmd *exec.Cmd) (w *Worker, err error)
SpawnWorker creates new worker and connects it to goridge relay, method Wait() must be handled on level above.
type Pool ¶
type Pool struct { // Observer is optional callback to handle worker create/destruct/error events. Observer func(event int, w *Worker, ctx interface{}) // contains filtered or unexported fields }
Pool controls worker creation, destruction and task routing.
func NewPool ¶
NewPool creates new worker pool and task multiplexer. Pool will initiate with one worker.
func (*Pool) Destroy ¶
func (p *Pool) Destroy()
Destroy all underlying workers (but let them to complete the task).
type SocketFactory ¶
type SocketFactory struct {
// contains filtered or unexported fields
}
SocketFactory connects to external workers using socket server.
func NewSocketFactory ¶
func NewSocketFactory(ls net.Listener, tout time.Duration) *SocketFactory
NewSocketFactory returns SocketFactory attached to a given socket listener. tout specifies for how long factory should serve for incoming relay connection
func (*SocketFactory) SpawnWorker ¶
func (f *SocketFactory) SpawnWorker(cmd *exec.Cmd) (w *Worker, err error)
SpawnWorker creates worker and connects it to appropriate relay or returns error
type State ¶
type State interface { // Value returns state value Value() int64 // NumExecs shows how many times worker was invoked NumExecs() uint64 // Updated indicates a moment updated last state change Updated() time.Time }
State represents worker status and updated time.
type Worker ¶
type Worker struct { // Pid of the process, points to Pid of underlying process and // can be nil while process is not started. Pid *int // contains filtered or unexported fields }
Worker - supervised process with api over goridge.Relay.
func (*Worker) Exec ¶
Exec sends payload to worker, executes it and returns result or error. Make sure to handle worker.Wait() to gather worker level errors. Method might return JobError indicating issue with payload.
func (*Worker) Kill ¶
Kill kills underlying process, make sure to call Wait() func to gather error log from the stderr. Waits for process completion.
func (*Worker) State ¶
State return receive-only worker state object, state can be used to safely access worker status, time when status changed and number of worker executions.
func (*Worker) Stop ¶
Stop sends soft termination command to the worker and waits for process completion.