supervisor

package module
Version: v2.0.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 31, 2021 License: MIT Imports: 17 Imported by: 0

README

go-supervisor (V2)

Build Status codecov

Small library for supervising child processes in Go, it exposes Stdout,Stderr and Stdin in the "Go way" using channels...

Example

echo.sh print stuff to stdout and stderr and quit after 5 seconds...

#!/usr/bin/env bash

echo "STDOUT MESSAGE"
sleep 0.1
echo "STDERR MESSAGE" 1>&2
sleep 0.1

supervisor-exapmle.go spawn and supervise the bash program...

package main

import (
	"fmt"
	"path/filepath"
	"time"

	"github.com/kontera-technologies/go-supervisor/v2"
)

func main() {
	testDir, _ := filepath.Abs("testdata")
	events := make(chan supervisor.Event)
	p := supervisor.NewProcess(supervisor.ProcessOptions{
		Name:                 "./example.sh",
		Dir:                  testDir,
		Id:                   "example",
		EventNotifier:        events,
		OutputParser:         supervisor.MakeBytesParser,
		ErrorParser:          supervisor.MakeBytesParser,
		MaxSpawns:            4,
		MaxSpawnAttempts:     2,
		MaxInterruptAttempts: 3,
		MaxTerminateAttempts: 5,
		IdleTimeout:          10 * time.Second,
	})

	exit := make(chan bool)

	go func() {
		for {
			select {
			case msg := <-p.Stdout():
				fmt.Printf("Received STDOUT message: %s\n", *msg)
			case msg := <-p.Stderr():
				fmt.Printf("Received STDERR message: %s\n", *msg)
			case event := <-events:
				switch event.Code {
				case "ProcessStart":
					fmt.Printf("Received event: %s\n", event.Code)
				default:
					fmt.Printf("Received event: %s - %s\n", event.Code, event.Message)
				}
			case <-p.DoneNotifier():
				fmt.Println("Closing loop we are done...")
				close(exit)
				return
			}
		}
	}()

	if err := p.Start(); err != nil {
		panic(fmt.Sprintf("failed to start process: %s", err))
	}

	<-exit
}

running the program should produce this output

Received event: ProcessStart
Received STDOUT message: STDOUT MESSAGE
Received STDERR message: STDERR MESSAGE
Received event: ProcessDone - exit status 0
Received event: StoppingHeartbeatMonitoring - Stop signal received.
Received event: Sleep - Sleeping for 1s before respwaning instance.
Received event: ProcessRespawn - Trying to respawn instance.
Received event: ProcessStart
Received STDOUT message: STDOUT MESSAGE
Received STDERR message: STDERR MESSAGE
Received event: ProcessDone - exit status 0
Received event: StoppingHeartbeatMonitoring - Stop signal received.
Received event: Sleep - Sleeping for 1s before respwaning instance.
Received event: ProcessRespawn - Trying to respawn instance.
Received event: ProcessStart
Received STDOUT message: STDOUT MESSAGE
Received STDERR message: STDERR MESSAGE
Received event: ProcessDone - exit status 0
Received event: StoppingHeartbeatMonitoring - Stop signal received.
Received event: Sleep - Sleeping for 1s before respwaning instance.
Received event: ProcessRespawn - Trying to respawn instance.
Received event: ProcessStart
Received STDOUT message: STDOUT MESSAGE
Received STDERR message: STDERR MESSAGE
Received event: ProcessDone - exit status 0
Received event: StoppingHeartbeatMonitoring - Stop signal received.
Received event: RespawnError - Max number of respawns reached.
Closing loop we are done...

Documentation

Overview

Example
testDir, _ := filepath.Abs("testdata")
events := make(chan supervisor.Event)
p := supervisor.NewProcess(supervisor.ProcessOptions{
	Name:                 "./example.sh",
	Dir:                  testDir,
	Id:                   "example",
	EventNotifier:        events,
	OutputParser:         supervisor.MakeBytesParser,
	ErrorParser:          supervisor.MakeBytesParser,
	MaxSpawns:            4,
	MaxSpawnAttempts:     2,
	MaxInterruptAttempts: 3,
	MaxTerminateAttempts: 5,
	IdleTimeout:          10 * time.Second,
	MaxSpawnBackOff:      time.Second,
	MaxRespawnBackOff:    time.Second,
})

exit := make(chan bool)

go func() {
	for {
		select {
		case msg := <-p.Stdout():
			fmt.Printf("Received STDOUT message: %s\n", *msg)
		case msg := <-p.Stderr():
			fmt.Printf("Received STDERR message: %s\n", *msg)
		case event := <-events:
			if event.Code == "ProcessStart" || event.Message == "" {
				fmt.Printf("Received event: %s\n", event.Code)
			} else {
				fmt.Printf("Received event: %s - %s\n", event.Code, event.Message)
			}
		case <-p.DoneNotifier():
			fmt.Println("Closing loop we are done...")
			close(exit)
			return
		}
	}
}()

if err := p.Start(); err != nil {
	panic(fmt.Sprintf("failed to start process: %s", err))
}

<-exit
Output:

Received event: ProcessStart
Received STDOUT message: STDOUT MESSAGE
Received STDERR message: STDERR MESSAGE
Received event: ProcessDone - exit status 0
Received event: StoppingHeartbeatMonitoring - Stop signal received.
Received event: Sleep - Sleeping for 1s before respwaning instance.
Received event: ProcessRespawn - Trying to respawn instance.
Received event: ProcessStart
Received STDOUT message: STDOUT MESSAGE
Received STDERR message: STDERR MESSAGE
Received event: ProcessDone - exit status 0
Received event: StoppingHeartbeatMonitoring - Stop signal received.
Received event: Sleep - Sleeping for 1s before respwaning instance.
Received event: ProcessRespawn - Trying to respawn instance.
Received event: ProcessStart
Received STDOUT message: STDOUT MESSAGE
Received STDERR message: STDERR MESSAGE
Received event: ProcessDone - exit status 0
Received event: StoppingHeartbeatMonitoring - Stop signal received.
Received event: Sleep - Sleeping for 1s before respwaning instance.
Received event: ProcessRespawn - Trying to respawn instance.
Received event: ProcessStart
Received STDOUT message: STDOUT MESSAGE
Received STDERR message: STDERR MESSAGE
Received event: ProcessDone - exit status 0
Received event: StoppingHeartbeatMonitoring - Stop signal received.
Received event: RespawnError - Max number of respawns reached.
Closing loop we are done...

Index

Examples

Constants

This section is empty.

Variables

View Source
var EnsureClosedTimeout = time.Second

Functions

This section is empty.

Types

type Event

type Event struct {
	Id         string
	Code       string
	Message    string
	Time       time.Time
	TimeFormat string
}

func (Event) String

func (ev Event) String() string

type Process

type Process struct {
	// contains filtered or unexported fields
}

func NewProcess

func NewProcess(opts ProcessOptions) *Process

func (*Process) CalcBackOff

func (p *Process) CalcBackOff(attempt int, step time.Duration, maxBackOff time.Duration) time.Duration

func (*Process) DoneNotifier

func (p *Process) DoneNotifier() <-chan bool

func (*Process) EmptyInput

func (p *Process) EmptyInput()

EmptyInput empties all messages from the Input channel.

func (*Process) EventNotifier

func (p *Process) EventNotifier() chan Event

EventNotifier returns the eventNotifier channel (and creates one if none exists).

It is protected by Process.eventNotifierMu.

func (*Process) Input

func (p *Process) Input() chan<- []byte

func (*Process) IsAlive

func (p *Process) IsAlive() bool

func (*Process) IsDone

func (p *Process) IsDone() bool

func (*Process) LastError

func (p *Process) LastError() error

func (*Process) LastProcessState

func (p *Process) LastProcessState() *os.ProcessState

func (*Process) Pid

func (p *Process) Pid() int

func (*Process) Restart

func (p *Process) Restart() error

Restart tries to stop and start the process. Entering this function will change the phase from running to respawning (any other initial phase will cause an error to be returned).

If it fails to stop the process the phase will change to errored and notifyDone will be called. If there are no more allowed respawns the phase will change to stopped and notifyDone will be called.

This function calls Process.Start to start the process which will change the phase to "running" (or "errored" if it fails) If Start fails, notifyDone will be called.

func (*Process) Start

func (p *Process) Start() (err error)

func (*Process) Stderr

func (p *Process) Stderr() <-chan *interface{}

func (*Process) Stdout

func (p *Process) Stdout() <-chan *interface{}

func (*Process) Stop

func (p *Process) Stop() error

Stop tries to stop the process. Entering this function will change the phase from "running" to "stopping" (any other initial phase will cause an error to be returned).

This function will call notifyDone when it is done.

If it fails to stop the process, the phase will change to errored and an error will be returned. Otherwise, the phase changes to stopped.

type ProcessOptions

type ProcessOptions struct {
	// If Name contains no path separators, Command uses LookPath to
	// resolve Name to a complete path if possible. Otherwise it uses Name
	// directly as Path.
	Name string

	// The returned Cmd's Args field is constructed from the command name
	// followed by the elements of arg, so arg should not include the
	// command name itself. For example, Command("echo", "hello").
	// Args[0] is always name, not the possibly resolved Path.
	Args []string

	// Env specifies the environment of the process.
	// Each entry is of the form "key=value".
	// If Env is nil, the new process uses the current process's
	// environment.
	// If Env contains duplicate environment keys, only the last
	// value in the slice for each duplicate key is used.
	Env []string

	// When InheritEnv is true, os.Environ() will be prepended to Env.
	InheritEnv bool

	// Dir specifies the working directory of the command.
	// If Dir is the empty string, Run runs the command in the
	// calling process's current directory.
	Dir string

	// ExtraFiles specifies additional open files to be inherited by the
	// new process. It does not include standard input, standard output, or
	// standard error. If non-nil, entry i becomes file descriptor 3+i.
	//
	// ExtraFiles is not supported on Windows.
	ExtraFiles []*os.File

	// SysProcAttr holds optional, operating system-specific attributes.
	// Run passes it to os.StartProcess as the os.ProcAttr's Sys field.
	SysProcAttr *syscall.SysProcAttr

	In  chan []byte
	Out chan *interface{}
	Err chan *interface{}

	EventNotifier chan Event

	Id string

	// Debug - when this flag is set to true, events will be logged to the default go logger.
	Debug bool

	OutputParser func(fromR io.Reader, bufferSize int) ProduceFn
	ErrorParser  func(fromR io.Reader, bufferSize int) ProduceFn

	// MaxSpawns is the maximum number of times that a process can be spawned
	// Set to -1, for an unlimited amount of times.
	// Will use defaultMaxSpawns when set to 0.
	MaxSpawns int

	// MaxSpawnAttempts is the maximum number of spawns attempts for a process.
	// Set to -1, for an unlimited amount of attempts.
	// Will use defaultMaxSpawnAttempts when set to 0.
	MaxSpawnAttempts int

	// MaxSpawnBackOff is the maximum duration that we will wait between spawn attempts.
	// Will use defaultMaxSpawnBackOff when set to 0.
	MaxSpawnBackOff time.Duration

	// MaxRespawnBackOff is the maximum duration that we will wait between respawn attempts.
	// Will use defaultMaxRespawnBackOff when set to 0.
	MaxRespawnBackOff time.Duration

	// MaxInterruptAttempts is the maximum number of times that we will try to interrupt the process when closed, before
	// terminating and/or killing it.
	// Set to -1, to never send the interrupt signal.
	// Will use defaultMaxInterruptAttempts when set to 0.
	MaxInterruptAttempts int

	// MaxTerminateAttempts is the maximum number of times that we will try to terminate the process when closed, before
	// killing it.
	// Set to -1, to never send the terminate signal.
	// Will use defaultMaxTerminateAttempts when set to 0.
	MaxTerminateAttempts int

	// NotifyEventTimeout is the amount of time that the process will BLOCK while trying to send an event.
	NotifyEventTimeout time.Duration

	// ParserBufferSize is the size of the buffer to be used by the OutputParser and ErrorParser.
	// Will use defaultParserBufferSize when set to 0.
	ParserBufferSize int

	// IdleTimeout is the duration that the process can remain idle (no output) before we terminate the process.
	// Set to -1, for an unlimited idle timeout (not recommended)
	// Will use defaultIdleTimeout when set to 0.
	IdleTimeout time.Duration

	// TerminationGraceTimeout is the duration of time that the supervisor will wait after sending interrupt/terminate
	// signals, before checking if the process is still alive.
	// Will use defaultTerminationGraceTimeout when set to 0.
	TerminationGraceTimeout time.Duration

	// EventTimeFormat is the time format used when events are marshaled to string.
	// Will use defaultEventTimeFormat when set to "".
	EventTimeFormat string
}

type ProduceFn

type ProduceFn func() (*interface{}, error)

func MakeBytesParser

func MakeBytesParser(fromR io.Reader, bufferSize int) ProduceFn

MakeBytesParser is called with an io.Reader, and returns a function, that when called will output references to byte slices that contain the bytes read from the io.Reader.

func MakeJsonLineParser

func MakeJsonLineParser(fromR io.Reader, bufferSize int) ProduceFn

MakeJsonLineParser is called with an io.Reader, and returns a function, that when called will output references to map[string]interface{} objects that contain the parsed json data. If an invalid json is encountered, all the characters up until a new-line will be dropped.

func MakeLineParser

func MakeLineParser(fromR io.Reader, bufferSize int) ProduceFn

MakeLineParser is called with an io.Reader, and returns a function, that when called will output references to strings that contain the bytes read from the io.Reader (without the new-line suffix).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
t or T : Toggle theme light dark auto
y or Y : Canonical URL