supervisor

package
v0.0.0-...-7dcf7c4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 27, 2022 License: MIT Imports: 15 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// Standard valve names
	StdinValve  = "stdin"
	StderrValve = "stderr"
	StdoutValve = "stdout"
)

Variables

View Source
var (
	ErrAlreadyExists = errors.New("already exists with name")
)

Functions

This section is empty.

Types

type BufferSink

type BufferSink struct {
	*bytes.Buffer
	Closed bool
}

func NewBufferSink

func NewBufferSink() *BufferSink

NewBufferSink will store any incoming data into the passed buf.

func (*BufferSink) Close

func (bf *BufferSink) Close() error

func (*BufferSink) Write

func (bf *BufferSink) Write(p []byte) (n int, err error)

type Connector

type Connector struct {
	Src  io.Reader
	Dst  io.WriteCloser
	Info ConnectorInfo
	// contains filtered or unexported fields
}

func NewConnector

func NewConnector() *Connector

func (*Connector) IsWaiting

func (c *Connector) IsWaiting() bool

func (*Connector) ReadFrom

func (c *Connector) ReadFrom(src io.Reader)

func (*Connector) Reset

func (c *Connector) Reset()

func (*Connector) SendTo

func (c *Connector) SendTo(dst io.WriteCloser)

func (*Connector) Serve

func (c *Connector) Serve(ctx context.Context) (err error)

Serve will try copying from Src -> Dst. If there is no Dst (nil), then we wait blocking until a new Dst is received. If Dst has an EOF error or other error, the Dst is cleared. If Src has EOF, we exit cleanly.

type ConnectorInfo

type ConnectorInfo struct {
	BytesWritten int64
}

type DstVar

type DstVar struct {
	Name string // unique ID in pipeline
	Sink io.WriteCloser
	// contains filtered or unexported fields
}

func NewSink

func NewSink(name string, dst io.WriteCloser) *DstVar

func (*DstVar) Close

func (v *DstVar) Close() error

func (*DstVar) String

func (v *DstVar) String() string

func (*DstVar) WaitClosed

func (v *DstVar) WaitClosed(ctx context.Context) error

WaitClosed will block until this variable is closed with Close()

func (*DstVar) Write

func (v *DstVar) Write(p []byte) (n int, err error)

type InValve

type InValve struct {
	PortName string
	FifoPath string
	// contains filtered or unexported fields
}

InValve is what is passed to named args, like cat {namedarg}. Since the process itself will be opening the path, we only open a writer to it so that we don't block the process from opening it.

func (*InValve) Close

func (iv *InValve) Close() error

func (*InValve) Open

func (iv *InValve) Open(ctx context.Context)

func (*InValve) OpenStdin

func (iv *InValve) OpenStdin() (*os.File, error)

OpenStdin opens a file handle to pass immediately to a process as stdin. The file will be closed when the valve is closed.

func (*InValve) Path

func (iv *InValve) Path() string

func (*InValve) String

func (iv *InValve) String() string

func (*InValve) Write

func (iv *InValve) Write(p []byte) (n int, err error)

type OutValve

type OutValve struct {
	*Connector
	PortName string
	FifoPath string
	// contains filtered or unexported fields
}

OutValve is an outgoing stream of data from process. Just like InValve, it can be either opened for stdio, so that we open both the read and write end and give the read end to stdout/stderr, or we can open it for argv passing, which means we only open the read end and let the write end be opened by the process itself using open().

func (*OutValve) Close

func (ov *OutValve) Close() error

func (*OutValve) Open

func (ov *OutValve) Open(ctx context.Context) error

func (*OutValve) OpenStdout

func (ov *OutValve) OpenStdout() (*os.File, error)

OpenStdin opens a file handle to pass immediately to a process as stdin. The file will be closed when the valve is closed.

func (*OutValve) Path

func (ov *OutValve) Path() string

func (*OutValve) Read

func (ov *OutValve) Read(p []byte) (n int, err error)

func (*OutValve) String

func (ov *OutValve) String() string

type Pipeline

type Pipeline struct {
	*suture.Supervisor
	Creator   *Supervisor
	Name      string
	Processes map[string]*Process
	Spouts    map[string]*SrcVar
	Sinks     map[string]*DstVar
	// contains filtered or unexported fields
}

func NewPipeline

func NewPipeline(creator *Supervisor, name string, cfg PipelineConfig) *Pipeline

func (*Pipeline) CreateSink

func (p *Pipeline) CreateSink(name string, sink io.WriteCloser) (*DstVar, error)

func (*Pipeline) CreateSpout

func (p *Pipeline) CreateSpout(name string, src io.Reader) (*SrcVar, error)

func (*Pipeline) ExitWhen

func (p *Pipeline) ExitWhen(ctx context.Context, processOrVar string) error

func (*Pipeline) FindIn

func (p *Pipeline) FindIn(process, port string) (*InValve, error)

func (*Pipeline) FindOut

func (p *Pipeline) FindOut(process, port string) (*OutValve, error)

func (*Pipeline) FindProcess

func (p *Pipeline) FindProcess(name string) *Process

func (*Pipeline) FindSink

func (p *Pipeline) FindSink(name string) (*DstVar, error)

func (*Pipeline) FindSource

func (p *Pipeline) FindSource(name string) (*SrcVar, error)

func (*Pipeline) StartProcess

func (p *Pipeline) StartProcess(name string, exe string, params *ProcessConfig) (*Process, error)

func (*Pipeline) Stop

func (p *Pipeline) Stop()

type PipelineConfig

type PipelineConfig struct {
	DataDir string
}

type ProcInfo

type ProcInfo struct {
	State ProcState
	Rc    int   // return code exited with
	Err   error // if exited with any error
}

func (ProcInfo) String

func (pi ProcInfo) String() string

type ProcState

type ProcState int
const (
	ProcNotStarted ProcState = iota // Process is not started yet
	ProcFinished                    // Process is not running anymore (and won't be restarted)
	ProcRunning                     // Process is running still (actual OS process could be not alive temporarily)
	ProcError                       // Process is not running anymore because it had too many errors
)

func (ProcState) String

func (ps ProcState) String() string

type Process

type Process struct {
	Token   suture.ServiceToken
	DataDir string // directory to store process specific data
	Name    string
	ExePath string
	Argv    []string
	Ports   map[string]hosercmd.Port

	Cmd *exec.Cmd

	Info ProcInfo

	Ins  map[string]*InValve
	Outs map[string]*OutValve
	// contains filtered or unexported fields
}

func NewProcess

func NewProcess(name, exePath string, cfg ProcessConfig) (*Process, error)

func (*Process) AddInValve

func (p *Process) AddInValve(name string) (*InValve, error)

AddInValve creates an in valve for an argv parameter, that is passed as part of argv to the process. Since the process will call open() on the passed in path, we can't open the named pipe with O_RDONLY because it will cause the open() call in the process to block, preventing the process from terminating in some cases.

func (*Process) AddOutValve

func (p *Process) AddOutValve(name string) (*OutValve, error)

AddOutValve creates a new named pipe and opens it in rdonly mode to write to the process that will open up the named pipe using open() itself.

func (*Process) ChangeState

func (p *Process) ChangeState(modify func(*ProcInfo))

func (*Process) Close

func (p *Process) Close(ctx context.Context) error

func (*Process) IsFinished

func (p *Process) IsFinished() bool

func (*Process) Serve

func (p *Process) Serve(ctx context.Context) error

func (*Process) String

func (p *Process) String() string

func (*Process) SupervisorTree

func (p *Process) SupervisorTree() suture.Service

Supervise adds the process (a supervisor tree that manages the process) to sup.

func (*Process) Wait

func (p *Process) Wait(ctx context.Context, wanted []ProcState) (info ProcInfo, err error)

type ProcessConfig

type ProcessConfig struct {
	Argv       []string
	Ports      map[string]hosercmd.Port
	PrivateDir string
	SharedDir  string
}

type ProcessSup

type ProcessSup struct {
	*suture.Supervisor

	Valves *suture.Supervisor // restarts valves if they fail for some reason.
	// contains filtered or unexported fields
}

func NewProcessSupervisor

func NewProcessSupervisor(proc *Process) *ProcessSup

func (*ProcessSup) StartValves

func (ps *ProcessSup) StartValves(proc *Process)

type Source

type Source interface {
	SendTo(w io.WriteCloser)
}

type SrcVar

type SrcVar struct {
	*Connector
	Name  string // unique ID in pipeline
	Spout io.Reader
	Token suture.ServiceToken
}

func (*SrcVar) String

func (v *SrcVar) String() string

type Supervisor

type Supervisor struct {
	Dir       string
	Pipelines map[string]*Pipeline
	// contains filtered or unexported fields
}

func New

func New(dir string) *Supervisor

func (*Supervisor) AddPipeline

func (s *Supervisor) AddPipeline(name string) (*Pipeline, error)

func (*Supervisor) Close

func (s *Supervisor) Close() error

func (*Supervisor) RemovePipeline

func (s *Supervisor) RemovePipeline(p *Pipeline) error

func (*Supervisor) Serve

func (s *Supervisor) Serve(ctx context.Context) error

func (*Supervisor) ServeBackground

func (s *Supervisor) ServeBackground(ctx context.Context) <-chan error

type Valve

type Valve interface {
	fmt.Stringer
	io.Closer

	// Path returns the filepath to the named pipe
	Path() string
}

type Variable

type Variable interface {
	fmt.Stringer
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL