Documentation ¶
Index ¶
- Constants
- Variables
- type BufferSink
- type Connector
- type ConnectorInfo
- type DstVar
- type InValve
- type OutValve
- type Pipeline
- func (p *Pipeline) CreateSink(name string, sink io.WriteCloser) (*DstVar, error)
- func (p *Pipeline) CreateSpout(name string, src io.Reader) (*SrcVar, error)
- func (p *Pipeline) ExitWhen(ctx context.Context, processOrVar string) error
- func (p *Pipeline) FindIn(process, port string) (*InValve, error)
- func (p *Pipeline) FindOut(process, port string) (*OutValve, error)
- func (p *Pipeline) FindProcess(name string) *Process
- func (p *Pipeline) FindSink(name string) (*DstVar, error)
- func (p *Pipeline) FindSource(name string) (*SrcVar, error)
- func (p *Pipeline) StartProcess(name string, exe string, params *ProcessConfig) (*Process, error)
- func (p *Pipeline) Stop()
- type PipelineConfig
- type ProcInfo
- type ProcState
- type Process
- func (p *Process) AddInValve(name string) (*InValve, error)
- func (p *Process) AddOutValve(name string) (*OutValve, error)
- func (p *Process) ChangeState(modify func(*ProcInfo))
- func (p *Process) Close(ctx context.Context) error
- func (p *Process) IsFinished() bool
- func (p *Process) Serve(ctx context.Context) error
- func (p *Process) String() string
- func (p *Process) SupervisorTree() suture.Service
- func (p *Process) Wait(ctx context.Context, wanted []ProcState) (info ProcInfo, err error)
- type ProcessConfig
- type ProcessSup
- type Source
- type SrcVar
- type Supervisor
- type Valve
- type Variable
Constants ¶
const ( // Standard valve names StdinValve = "stdin" StderrValve = "stderr" StdoutValve = "stdout" )
Variables ¶
var (
ErrAlreadyExists = errors.New("already exists with name")
)
Functions ¶
This section is empty.
Types ¶
type BufferSink ¶
func NewBufferSink ¶
func NewBufferSink() *BufferSink
NewBufferSink will store any incoming data into the passed buf.
func (*BufferSink) Close ¶
func (bf *BufferSink) Close() error
type Connector ¶
type Connector struct { Src io.Reader Dst io.WriteCloser Info ConnectorInfo // contains filtered or unexported fields }
func NewConnector ¶
func NewConnector() *Connector
func (*Connector) SendTo ¶
func (c *Connector) SendTo(dst io.WriteCloser)
type ConnectorInfo ¶
type ConnectorInfo struct {
BytesWritten int64
}
type DstVar ¶
type DstVar struct { Name string // unique ID in pipeline Sink io.WriteCloser // contains filtered or unexported fields }
func (*DstVar) WaitClosed ¶
WaitClosed will block until this variable is closed with Close()
type InValve ¶
InValve is what is passed to named args, like cat {namedarg}. Since the process itself will be opening the path, we only open a writer to it so that we don't block the process from opening it.
type OutValve ¶
type OutValve struct { *Connector PortName string FifoPath string // contains filtered or unexported fields }
OutValve is an outgoing stream of data from process. Just like InValve, it can be either opened for stdio, so that we open both the read and write end and give the read end to stdout/stderr, or we can open it for argv passing, which means we only open the read end and let the write end be opened by the process itself using open().
func (*OutValve) OpenStdout ¶
OpenStdin opens a file handle to pass immediately to a process as stdin. The file will be closed when the valve is closed.
type Pipeline ¶
type Pipeline struct { *suture.Supervisor Creator *Supervisor Name string Processes map[string]*Process Spouts map[string]*SrcVar Sinks map[string]*DstVar // contains filtered or unexported fields }
func NewPipeline ¶
func NewPipeline(creator *Supervisor, name string, cfg PipelineConfig) *Pipeline
func (*Pipeline) CreateSink ¶
func (*Pipeline) CreateSpout ¶
func (*Pipeline) FindProcess ¶
func (*Pipeline) StartProcess ¶
type PipelineConfig ¶
type PipelineConfig struct {
DataDir string
}
type ProcInfo ¶
type ProcState ¶
type ProcState int
const ( ProcNotStarted ProcState = iota // Process is not started yet ProcFinished // Process is not running anymore (and won't be restarted) ProcRunning // Process is running still (actual OS process could be not alive temporarily) ProcError // Process is not running anymore because it had too many errors )
type Process ¶
type Process struct { Token suture.ServiceToken DataDir string // directory to store process specific data Name string ExePath string Argv []string Ports map[string]hosercmd.Port Cmd *exec.Cmd Info ProcInfo Ins map[string]*InValve Outs map[string]*OutValve // contains filtered or unexported fields }
func NewProcess ¶
func NewProcess(name, exePath string, cfg ProcessConfig) (*Process, error)
func (*Process) AddInValve ¶
AddInValve creates an in valve for an argv parameter, that is passed as part of argv to the process. Since the process will call open() on the passed in path, we can't open the named pipe with O_RDONLY because it will cause the open() call in the process to block, preventing the process from terminating in some cases.
func (*Process) AddOutValve ¶
AddOutValve creates a new named pipe and opens it in rdonly mode to write to the process that will open up the named pipe using open() itself.
func (*Process) ChangeState ¶
func (*Process) IsFinished ¶
func (*Process) SupervisorTree ¶
func (p *Process) SupervisorTree() suture.Service
Supervise adds the process (a supervisor tree that manages the process) to sup.
type ProcessConfig ¶
type ProcessSup ¶
type ProcessSup struct { *suture.Supervisor Valves *suture.Supervisor // restarts valves if they fail for some reason. // contains filtered or unexported fields }
func NewProcessSupervisor ¶
func NewProcessSupervisor(proc *Process) *ProcessSup
func (*ProcessSup) StartValves ¶
func (ps *ProcessSup) StartValves(proc *Process)
type Source ¶
type Source interface {
SendTo(w io.WriteCloser)
}
type SrcVar ¶
type Supervisor ¶
type Supervisor struct { Dir string Pipelines map[string]*Pipeline // contains filtered or unexported fields }
func New ¶
func New(dir string) *Supervisor
func (*Supervisor) AddPipeline ¶
func (s *Supervisor) AddPipeline(name string) (*Pipeline, error)
func (*Supervisor) Close ¶
func (s *Supervisor) Close() error
func (*Supervisor) RemovePipeline ¶
func (s *Supervisor) RemovePipeline(p *Pipeline) error
func (*Supervisor) ServeBackground ¶
func (s *Supervisor) ServeBackground(ctx context.Context) <-chan error