Documentation
¶
Overview ¶
Package pipe provides the Processor interface and built-in processors for transforming Frame streams in a Pipeline.
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Pipeline ¶
type Pipeline struct {
// contains filtered or unexported fields
}
Pipeline chains Processors into a concurrent processing graph. Each Processor runs in its own goroutine with backpressure-aware channels connecting the stages.
input → [Proc A] → [Proc B] → [Proc C] → output
goroutine goroutine goroutine
Canceling the context tears down the entire pipeline. An error in any stage cancels all other stages.
func New ¶
New creates a Pipeline from the given Processors. Processors are executed in order: the output of each feeds into the input of the next.
func (*Pipeline) Run ¶
Run starts the pipeline. It reads Frames from the input Stream and returns a new Stream containing the output of the final Processor.
Each Processor runs in its own goroutine. The pipeline self-destructs when the context is canceled or any processor returns an error.
The returned Stream is safe to iterate immediately.
func (*Pipeline) WithBuffer ¶
WithBuffer sets the channel buffer size between pipeline stages. Default is 16.
- 0: fully synchronous — minimum latency, maximum backpressure
- 16: good default for streaming
- 64+: high-throughput batch scenarios
type Processor ¶
Processor transforms Frames from an input Stream to an output Emitter. It is the fundamental unit of composition in a Niro pipeline.
Contracts:
- Process must not close the Emitter — the Pipeline manages that.
- Process should return when ctx is canceled or the input stream ends.
- Errors returned from Process are propagated to the output stream.
- Process runs in its own goroutine when used in a Pipeline.
func Accumulate ¶
func Accumulate() Processor
Accumulate creates a Processor that collects all text into a single frame emitted at the end of the stream. Useful for converting a token stream into a complete response.
func PassThrough ¶
func PassThrough() Processor
PassThrough creates a Processor that forwards all frames unchanged.