Documentation
¶
Overview ¶
Example ¶
testDir, _ := filepath.Abs("testdata") events := make(chan supervisor.Event) p := supervisor.NewProcess(supervisor.ProcessOptions{ Name: "./example.sh", Dir: testDir, Id: "example", EventNotifier: events, OutputParser: supervisor.MakeBytesParser, ErrorParser: supervisor.MakeBytesParser, MaxSpawns: 4, MaxSpawnAttempts: 2, MaxInterruptAttempts: 3, MaxTerminateAttempts: 5, IdleTimeout: 10 * time.Second, MaxSpawnBackOff: time.Second, MaxRespawnBackOff: time.Second, }) exit := make(chan bool) go func() { for { select { case msg := <-p.Stdout(): fmt.Printf("Received STDOUT message: %s\n", *msg) case msg := <-p.Stderr(): fmt.Printf("Received STDERR message: %s\n", *msg) case event := <-events: if event.Code == "ProcessStart" || event.Message == "" { fmt.Printf("Received event: %s\n", event.Code) } else { fmt.Printf("Received event: %s - %s\n", event.Code, event.Message) } case <-p.DoneNotifier(): fmt.Println("Closing loop we are done...") close(exit) return } } }() if err := p.Start(); err != nil { panic(fmt.Sprintf("failed to start process: %s", err)) } <-exit
Output: Received event: ProcessStart Received STDOUT message: STDOUT MESSAGE Received STDERR message: STDERR MESSAGE Received event: ProcessDone - exit status 0 Received event: StoppingHeartbeatMonitoring - Stop signal received. Received event: Sleep - Sleeping for 1s before respwaning instance. Received event: ProcessRespawn - Trying to respawn instance. Received event: ProcessStart Received STDOUT message: STDOUT MESSAGE Received STDERR message: STDERR MESSAGE Received event: ProcessDone - exit status 0 Received event: StoppingHeartbeatMonitoring - Stop signal received. Received event: Sleep - Sleeping for 1s before respwaning instance. Received event: ProcessRespawn - Trying to respawn instance. Received event: ProcessStart Received STDOUT message: STDOUT MESSAGE Received STDERR message: STDERR MESSAGE Received event: ProcessDone - exit status 0 Received event: StoppingHeartbeatMonitoring - Stop signal received. Received event: Sleep - Sleeping for 1s before respwaning instance. Received event: ProcessRespawn - Trying to respawn instance. Received event: ProcessStart Received STDOUT message: STDOUT MESSAGE Received STDERR message: STDERR MESSAGE Received event: ProcessDone - exit status 0 Received event: StoppingHeartbeatMonitoring - Stop signal received. Received event: RespawnError - Max number of respawns reached. Closing loop we are done...
Index ¶
- Variables
- func MonitorHeartBeat(idleTimeout time.Duration, runTimeout time.Duration, ...)
- type Event
- type Process
- func (p *Process) CalcBackOff(attempt int, step time.Duration, maxBackOff time.Duration) time.Duration
- func (p *Process) DoneNotifier() <-chan bool
- func (p *Process) EmptyInput()
- func (p *Process) EventNotifier() chan Event
- func (p *Process) Input() chan<- []byte
- func (p *Process) IsAlive() bool
- func (p *Process) IsDone() bool
- func (p *Process) LastError() error
- func (p *Process) LastProcessState() *os.ProcessState
- func (p *Process) Pid() int
- func (p *Process) Restart() error
- func (p *Process) Start() (err error)
- func (p *Process) Stderr() <-chan *interface{}
- func (p *Process) Stdout() <-chan *interface{}
- func (p *Process) Stop() error
- type ProcessOptions
- type ProduceFn
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var EnsureClosedTimeout = time.Second
Functions ¶
func MonitorHeartBeat ¶ added in v2.1.0
func MonitorHeartBeat(idleTimeout time.Duration, runTimeout time.Duration, heartbeat, isMonitorClosed, stopC chan bool, stop func() error, notifyEvent func(string, ...interface{}))
MonitorHeartBeat monitors the heartbeat channel and stops the process if idleTimeout time is passed without a positive heartbeat, or if a negative heartbeat is passed, or if the run timeout passed.
isMonitorClosed will be closed when this function exists.
When stopC closes, this function will exit immediately.
Types ¶
type Process ¶
type Process struct {
// contains filtered or unexported fields
}
func NewProcess ¶
func NewProcess(opts ProcessOptions) *Process
func (*Process) CalcBackOff ¶
func (*Process) DoneNotifier ¶
func (*Process) EmptyInput ¶
func (p *Process) EmptyInput()
EmptyInput empties all messages from the Input channel.
func (*Process) EventNotifier ¶
EventNotifier returns the eventNotifier channel (and creates one if none exists).
It is protected by Process.eventNotifierMu.
func (*Process) LastProcessState ¶
func (p *Process) LastProcessState() *os.ProcessState
func (*Process) Restart ¶
Restart tries to stop and start the process. Entering this function will change the phase from running to respawning (any other initial phase will cause an error to be returned).
If it fails to stop the process the phase will change to errored and notifyDone will be called. If there are no more allowed respawns the phase will change to stopped and notifyDone will be called.
This function calls Process.Start to start the process which will change the phase to "running" (or "errored" if it fails) If Start fails, notifyDone will be called.
func (*Process) Stop ¶
Stop tries to stop the process. Entering this function will change the phase from "running" to "stopping" (any other initial phase will cause an error to be returned).
This function will call notifyDone when it is done.
If it fails to stop the process, the phase will change to errored and an error will be returned. Otherwise, the phase changes to stopped.
type ProcessOptions ¶
type ProcessOptions struct { // If Name contains no path separators, Command uses LookPath to // resolve Name to a complete path if possible. Otherwise it uses Name // directly as Path. Name string // The returned Cmd's Args field is constructed from the command name // followed by the elements of arg, so arg should not include the // command name itself. For example, Command("echo", "hello"). // Args[0] is always name, not the possibly resolved Path. Args []string // Env specifies the environment of the process. // Each entry is of the form "key=value". // If Env is nil, the new process uses the current process's // environment. // If Env contains duplicate environment keys, only the last // value in the slice for each duplicate key is used. Env []string // When InheritEnv is true, os.Environ() will be prepended to Env. InheritEnv bool // Dir specifies the working directory of the command. // If Dir is the empty string, Run runs the command in the // calling process's current directory. Dir string // ExtraFiles specifies additional open files to be inherited by the // new process. It does not include standard input, standard output, or // standard error. If non-nil, entry i becomes file descriptor 3+i. // // ExtraFiles is not supported on Windows. ExtraFiles []*os.File // SysProcAttr holds optional, operating system-specific attributes. // Run passes it to os.StartProcess as the os.ProcAttr's Sys field. SysProcAttr *syscall.SysProcAttr In chan []byte Out chan *interface{} Err chan *interface{} EventNotifier chan Event Id string // Debug - when this flag is set to true, events will be logged to the default go logger. Debug bool OutputParser func(fromR io.Reader, bufferSize int) ProduceFn ErrorParser func(fromR io.Reader, bufferSize int) ProduceFn // MaxSpawns is the maximum number of times that a process can be spawned // Set to -1, for an unlimited amount of times. // Will use defaultMaxSpawns when set to 0. MaxSpawns int // MaxSpawnAttempts is the maximum number of spawns attempts for a process. // Set to -1, for an unlimited amount of attempts. // Will use defaultMaxSpawnAttempts when set to 0. MaxSpawnAttempts int // MaxSpawnBackOff is the maximum duration that we will wait between spawn attempts. // Will use defaultMaxSpawnBackOff when set to 0. MaxSpawnBackOff time.Duration // MaxRespawnBackOff is the maximum duration that we will wait between respawn attempts. // Will use defaultMaxRespawnBackOff when set to 0. MaxRespawnBackOff time.Duration // MaxInterruptAttempts is the maximum number of times that we will try to interrupt the process when closed, before // terminating and/or killing it. // Set to -1, to never send the interrupt signal. // Will use defaultMaxInterruptAttempts when set to 0. MaxInterruptAttempts int // MaxTerminateAttempts is the maximum number of times that we will try to terminate the process when closed, before // killing it. // Set to -1, to never send the terminate signal. // Will use defaultMaxTerminateAttempts when set to 0. MaxTerminateAttempts int // NotifyEventTimeout is the amount of time that the process will BLOCK while trying to send an event. NotifyEventTimeout time.Duration // ParserBufferSize is the size of the buffer to be used by the OutputParser and ErrorParser. // Will use defaultParserBufferSize when set to 0. ParserBufferSize int // IdleTimeout is the duration that the process can remain idle (no output) before we terminate the process. // Set to -1, for an unlimited idle timeout (not recommended) // Will use defaultIdleTimeout when set to 0. IdleTimeout time.Duration // TerminationGraceTimeout is the duration of time that the supervisor will wait after sending interrupt/terminate // signals, before checking if the process is still alive. // Will use defaultTerminationGraceTimeout when set to 0. TerminationGraceTimeout time.Duration // EventTimeFormat is the time format used when events are marshaled to string. // Will use defaultEventTimeFormat when set to "". EventTimeFormat string // RunTimeout is the duration that the process can run before we terminate the process. // Set to <= 0, for an unlimited run timeout // Will use defaultRunTimeout when set to 0. RunTimeout time.Duration }
type ProduceFn ¶
type ProduceFn func() (*interface{}, error)
func MakeBytesParser ¶
MakeBytesParser is called with an io.Reader, and returns a function, that when called will output references to byte slices that contain the bytes read from the io.Reader.
func MakeJsonLineParser ¶
MakeJsonLineParser is called with an io.Reader, and returns a function, that when called will output references to map[string]interface{} objects that contain the parsed json data. If an invalid json is encountered, all the characters up until a new-line will be dropped.