Documentation
¶
Index ¶
- func AccumulateAndOutput(fn AccumulateAndOutputFunc) gloo.Command
- func AccumulateAndProcess(fn AccumulateAndProcessFunc) gloo.Command
- func ChannelAccumulateAndOutput[T any](fn ChannelAccumulateAndOutputFunc[T]) gloo.ChannelCommand[T]
- func ChannelAccumulateAndProcess[T any](fn ChannelAccumulateAndProcessFunc[T]) gloo.ChannelCommand[T]
- func ChannelBuffer[T any](size int) gloo.ChannelExecutor[T]
- func ChannelFanOut[T any](outputs ...chan<- gloo.Row[T]) gloo.ChannelExecutor[T]
- func ChannelLineTransform[T any](fn ChannelLineTransformFunc[T]) gloo.ChannelCommand[T]
- func ChannelMerge[T any](inputs ...<-chan gloo.Row[T]) gloo.ChannelExecutor[T]
- func ChannelStatefulLineTransform[T any](fn ChannelStatefulLineTransformFunc[T]) gloo.ChannelCommand[T]
- func ChannelTransform[TIn, TOut any](fn func(TIn) (TOut, bool, error)) func(context.Context, <-chan gloo.Row[TIn], chan<- gloo.Row[TOut]) error
- func LineTransform(fn LineTransformFunc) gloo.Command
- func RawCommand(fn gloo.CommandExecutor) gloo.Command
- func StatefulLineTransform(fn StatefulLineTransformFunc) gloo.Command
- type AccumulateAndOutputFunc
- type AccumulateAndProcessFunc
- type ChannelAccumulateAndOutputFunc
- type ChannelAccumulateAndProcessFunc
- type ChannelLineTransformFunc
- type ChannelStatefulLineTransformFunc
- type LineTransformFunc
- type StatefulLineTransformFunc
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func AccumulateAndOutput ¶
func AccumulateAndOutput(fn AccumulateAndOutputFunc) gloo.Command
AccumulateAndOutput creates a Command that collects all lines and outputs with full control. Best for: wc - need to accumulate and produce custom output format
Example:
func (c command) Executor() gloo.CommandExecutor {
return gloo.AccumulateAndOutput(func(lines []string, stdout io.Writer) error {
fmt.Fprintf(stdout, "%d lines\n", len(lines))
return nil
}).Executor()
}
func AccumulateAndProcess ¶
func AccumulateAndProcess(fn AccumulateAndProcessFunc) gloo.Command
AccumulateAndProcess creates a Command that collects all lines, processes them, and outputs. Best for: sort, tac, shuf - need to see all input before producing output
Example:
func (c command) Executor() gloo.CommandExecutor {
return gloo.AccumulateAndProcess(func(lines []string) []string {
sort.Strings(lines)
return lines
}).Executor()
}
func ChannelAccumulateAndOutput ¶
func ChannelAccumulateAndOutput[T any](fn ChannelAccumulateAndOutputFunc[T]) gloo.ChannelCommand[T]
ChannelAccumulateAndOutput creates a ChannelCommand that collects all rows and outputs with full control. Best for: wc, stats - need to accumulate and produce custom output
Example with strings (simulating wc):
func WordCount() ChannelCommand[string] {
return ChannelAccumulateAndOutput(func(lines []string, out chan<- gloo.Row[string]) error {
wordCount := 0
for _, line := range lines {
wordCount += len(strings.fields(line))
}
out <- Row[string]{Data: fmt.Sprintf("%d words", wordCount)}
return nil
})
}
Example with custom types:
type Sale struct {
Product string
Amount float64
}
type Summary struct {
TotalSales float64
count int
}
func Summarize() ChannelCommand[Sale] {
// Note: Input is Sale, but we want to output Summary
// This requires a different pattern - see ChannelTransform below
}
func ChannelAccumulateAndProcess ¶
func ChannelAccumulateAndProcess[T any](fn ChannelAccumulateAndProcessFunc[T]) gloo.ChannelCommand[T]
ChannelAccumulateAndProcess creates a ChannelCommand that collects all rows, processes them, and outputs. Best for: sort, tac, shuf - need to see all input before producing output
Example with strings (simulating sort):
func Sort() ChannelCommand[string] {
return ChannelAccumulateAndProcess(func(lines []string) []string {
sort.Strings(lines)
return lines
})
}
Example with custom types:
type Transaction struct {
Amount float64
Date time.Time
}
func SortByAmount() ChannelCommand[Transaction] {
return ChannelAccumulateAndProcess(func(txns []Transaction) []Transaction {
sort.Slice(txns, func(i, j int) bool {
return txns[i].Amount < txns[j].Amount
})
return txns
})
}
func ChannelBuffer ¶
func ChannelBuffer[T any](size int) gloo.ChannelExecutor[T]
ChannelBuffer creates a buffered channel pipeline stage. Useful for decoupling slow producers from slow consumers.
Example:
func WithBuffer(size int, exec ChannelExecutor[string]) ChannelExecutor[string] {
return func(ctx context.Context, in <-chan gloo.Row[string], out chan<- gloo.Row[string]) error {
buffered := make(chan Row[string], size)
go func() {
exec(ctx, in, buffered)
close(buffered)
}()
for row := range buffered {
out <- row
}
return nil
}
}
func ChannelFanOut ¶
func ChannelFanOut[T any](outputs ...chan<- gloo.Row[T]) gloo.ChannelExecutor[T]
ChannelFanOut duplicates each input row to multiple output channels. Useful for broadcasting data to multiple consumers.
⚠️ THREAD SAFETY WARNING: This function sends the SAME Row[T] reference to ALL output channels. If T contains mutable data (pointers, slices, maps), all consumers will share that mutable state, which can cause DATA RACES.
SAFE to use with:
- Immutable types (string, int, bool, etc.)
- Read-only structs with no pointers
UNSAFE to use with:
- Structs containing slices, maps, or pointers
- Any mutable data that consumers might modify
For mutable types, implement a custom fan-out that copies data. See THREADING.md for examples.
Example (safe with strings):
func Tee(outputs ...chan<- gloo.Row[string]) ChannelExecutor[string] {
return ChannelFanOut(outputs...) // OK: strings are immutable
}
func ChannelLineTransform ¶
func ChannelLineTransform[T any](fn ChannelLineTransformFunc[T]) gloo.ChannelCommand[T]
ChannelLineTransform creates a ChannelCommand that transforms rows one at a time. Best for: grep, cut, tr, sed - stateless row transformations
Example with strings (simulating grep):
func Grep(pattern string) ChannelCommand[string] {
return ChannelLineTransform(func(line string) (string, bool) {
if strings.Contains(line, pattern) {
return line, true
}
return "", false
})
}
Example with custom types:
type Person struct {
Name string
Age int
}
func FilterAdults() ChannelCommand[Person] {
return ChannelLineTransform(func(p Person) (Person, bool) {
return p, p.Age >= 18
})
}
func ChannelMerge ¶
func ChannelMerge[T any](inputs ...<-chan gloo.Row[T]) gloo.ChannelExecutor[T]
ChannelMerge combines multiple input channels into one output channel. Useful for combining data from multiple sources.
Example:
func MergeFiles(readers ...io.Reader) ChannelExecutor[string] {
return func(ctx context.Context, _ <-chan gloo.Row[string], out chan<- gloo.Row[string]) error {
inputs := make([]<-chan gloo.Row[string], len(readers))
for i, r := range readers {
ch := make(chan Row[string], 100)
inputs[i] = ch
go ReaderToChannel(ctx, r, ch)
}
return ChannelMerge(inputs...)(ctx, nil, out)
}
}
func ChannelStatefulLineTransform ¶
func ChannelStatefulLineTransform[T any](fn ChannelStatefulLineTransformFunc[T]) gloo.ChannelCommand[T]
ChannelStatefulLineTransform creates a ChannelCommand that transforms rows with state tracking. Best for: uniq, nl, head - need row numbers or previous row tracking
Example with strings (simulating head):
func Head(n int) ChannelCommand[string] {
return ChannelStatefulLineTransform(func(rowNum int64, line string) (string, bool) {
return line, rowNum <= int64(n)
})
}
Example with custom types:
type Event struct {
ID int
Timestamp time.Time
}
func AddSequence() ChannelCommand[Event] {
return ChannelStatefulLineTransform(func(rowNum int64, e Event) (Event, bool) {
e.ID = int(rowNum)
return e, true
})
}
func ChannelTransform ¶
func ChannelTransform[TIn, TOut any](fn func(TIn) (TOut, bool, error)) func(context.Context, <-chan gloo.Row[TIn], chan<- gloo.Row[TOut]) error
ChannelTransform allows transforming from one type to another in a pipeline. This is useful when you need to change the data type between pipeline stages.
Example: Parse log lines into structured data
type LogEntry struct {
Level string
Timestamp time.Time
Message string
}
func ParseLogs() ChannelExecutor[string] {
return ChannelTransform[string, LogEntry](
func(line string) (LogEntry, bool, error) {
// Parse line into LogEntry
parts := strings.SplitN(line, " ", 3)
if len(parts) != 3 {
return LogEntry{}, false, nil
}
return LogEntry{
Level: parts[0],
Message: parts[2],
}, true, nil
},
)
}
func LineTransform ¶
func LineTransform(fn LineTransformFunc) gloo.Command
LineTransform creates a gloo.Command that transforms lines one at a time. Best for: grep, cut, tr, sed - stateless line transformations
Example:
func (c command) Executor() gloo.CommandExecutor {
return gloo.LineTransform(func(line string) (string, bool) {
if strings.Contains(line, c.pattern) {
return line, true
}
return "", false
}).Executor()
}
func RawCommand ¶
func RawCommand(fn gloo.CommandExecutor) gloo.Command
RawCommand wraps a raw CommandExecutor function. Best for: diff, paste, comm, find, ls - need full control over I/O
Example:
func (c command) Executor() gloo.CommandExecutor {
return gloo.RawCommand(func(ctx context.Context, stdin io.Reader, stdout, stderr io.Writer) error {
// Full control over I/O
return nil
}).Executor()
}
func StatefulLineTransform ¶
func StatefulLineTransform(fn StatefulLineTransformFunc) gloo.Command
StatefulLineTransform creates a Command that transforms lines with state tracking. Best for: uniq, nl, head - need line numbers or previous line tracking
Example:
func (c command) Executor() gloo.CommandExecutor {
return gloo.StatefulLineTransform(func(lineNum int64, line string) (string, bool) {
if lineNum <= c.maxLines {
return line, true
}
return "", false
}).Executor()
}
Types ¶
type AccumulateAndOutputFunc ¶
AccumulateAndOutputFunc collects all lines, processes them, and outputs directly.
type AccumulateAndProcessFunc ¶
AccumulateAndProcessFunc collects all lines, processes them, and returns the result.
type ChannelAccumulateAndOutputFunc ¶
ChannelAccumulateAndOutputFunc collects all rows, processes them, and outputs directly to the channel.
type ChannelAccumulateAndProcessFunc ¶
type ChannelAccumulateAndProcessFunc[T any] func(rows []T) []T
ChannelAccumulateAndProcessFunc collects all rows, processes them, and returns the result.
type ChannelLineTransformFunc ¶
ChannelLineTransformFunc is a simple row-by-row transformation for channels. Return (output, true) to emit the row, or (_, false) to skip it.
type ChannelStatefulLineTransformFunc ¶
ChannelStatefulLineTransformFunc is a row transformation with row number tracking. Return (output, true) to emit the row, or (_, false) to skip it.
type LineTransformFunc ¶
LineTransformFunc is a simple line-by-line transformation. Return (output, true) to emit the line, or (_, false) to skip it.