build

package
v0.0.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 15, 2025 License: AGPL-3.0 Imports: 5 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func AccumulateAndOutput

func AccumulateAndOutput(fn AccumulateAndOutputFunc) gloo.Command

AccumulateAndOutput creates a Command that collects all lines and outputs with full control. Best for: wc - need to accumulate and produce custom output format

Example:

func (c command) Executor() gloo.CommandExecutor {
    return gloo.AccumulateAndOutput(func(lines []string, stdout io.Writer) error {
        fmt.Fprintf(stdout, "%d lines\n", len(lines))
        return nil
    }).Executor()
}

func AccumulateAndProcess

func AccumulateAndProcess(fn AccumulateAndProcessFunc) gloo.Command

AccumulateAndProcess creates a Command that collects all lines, processes them, and outputs. Best for: sort, tac, shuf - need to see all input before producing output

Example:

func (c command) Executor() gloo.CommandExecutor {
    return gloo.AccumulateAndProcess(func(lines []string) []string {
        sort.Strings(lines)
        return lines
    }).Executor()
}

func ChannelAccumulateAndOutput

func ChannelAccumulateAndOutput[T any](fn ChannelAccumulateAndOutputFunc[T]) gloo.ChannelCommand[T]

ChannelAccumulateAndOutput creates a ChannelCommand that collects all rows and outputs with full control. Best for: wc, stats - need to accumulate and produce custom output

Example with strings (simulating wc):

func WordCount() ChannelCommand[string] {
    return ChannelAccumulateAndOutput(func(lines []string, out chan<- gloo.Row[string]) error {
        wordCount := 0
        for _, line := range lines {
            wordCount += len(strings.fields(line))
        }
        out <- Row[string]{Data: fmt.Sprintf("%d words", wordCount)}
        return nil
    })
}

Example with custom types:

type Sale struct {
    Product string
    Amount  float64
}

type Summary struct {
    TotalSales float64
    count      int
}

func Summarize() ChannelCommand[Sale] {
    // Note: Input is Sale, but we want to output Summary
    // This requires a different pattern - see ChannelTransform below
}

func ChannelAccumulateAndProcess

func ChannelAccumulateAndProcess[T any](fn ChannelAccumulateAndProcessFunc[T]) gloo.ChannelCommand[T]

ChannelAccumulateAndProcess creates a ChannelCommand that collects all rows, processes them, and outputs. Best for: sort, tac, shuf - need to see all input before producing output

Example with strings (simulating sort):

func Sort() ChannelCommand[string] {
    return ChannelAccumulateAndProcess(func(lines []string) []string {
        sort.Strings(lines)
        return lines
    })
}

Example with custom types:

type Transaction struct {
    Amount float64
    Date   time.Time
}

func SortByAmount() ChannelCommand[Transaction] {
    return ChannelAccumulateAndProcess(func(txns []Transaction) []Transaction {
        sort.Slice(txns, func(i, j int) bool {
            return txns[i].Amount < txns[j].Amount
        })
        return txns
    })
}

func ChannelBuffer

func ChannelBuffer[T any](size int) gloo.ChannelExecutor[T]

ChannelBuffer creates a buffered channel pipeline stage. Useful for decoupling slow producers from slow consumers.

Example:

func WithBuffer(size int, exec ChannelExecutor[string]) ChannelExecutor[string] {
    return func(ctx context.Context, in <-chan gloo.Row[string], out chan<- gloo.Row[string]) error {
        buffered := make(chan Row[string], size)
        go func() {
            exec(ctx, in, buffered)
            close(buffered)
        }()
        for row := range buffered {
            out <- row
        }
        return nil
    }
}

func ChannelFanOut

func ChannelFanOut[T any](outputs ...chan<- gloo.Row[T]) gloo.ChannelExecutor[T]

ChannelFanOut duplicates each input row to multiple output channels. Useful for broadcasting data to multiple consumers.

⚠️ THREAD SAFETY WARNING: This function sends the SAME Row[T] reference to ALL output channels. If T contains mutable data (pointers, slices, maps), all consumers will share that mutable state, which can cause DATA RACES.

SAFE to use with:

  • Immutable types (string, int, bool, etc.)
  • Read-only structs with no pointers

UNSAFE to use with:

  • Structs containing slices, maps, or pointers
  • Any mutable data that consumers might modify

For mutable types, implement a custom fan-out that copies data. See THREADING.md for examples.

Example (safe with strings):

func Tee(outputs ...chan<- gloo.Row[string]) ChannelExecutor[string] {
    return ChannelFanOut(outputs...)  // OK: strings are immutable
}

func ChannelLineTransform

func ChannelLineTransform[T any](fn ChannelLineTransformFunc[T]) gloo.ChannelCommand[T]

ChannelLineTransform creates a ChannelCommand that transforms rows one at a time. Best for: grep, cut, tr, sed - stateless row transformations

Example with strings (simulating grep):

func Grep(pattern string) ChannelCommand[string] {
    return ChannelLineTransform(func(line string) (string, bool) {
        if strings.Contains(line, pattern) {
            return line, true
        }
        return "", false
    })
}

Example with custom types:

type Person struct {
    Name string
    Age  int
}

func FilterAdults() ChannelCommand[Person] {
    return ChannelLineTransform(func(p Person) (Person, bool) {
        return p, p.Age >= 18
    })
}

func ChannelMerge

func ChannelMerge[T any](inputs ...<-chan gloo.Row[T]) gloo.ChannelExecutor[T]

ChannelMerge combines multiple input channels into one output channel. Useful for combining data from multiple sources.

Example:

func MergeFiles(readers ...io.Reader) ChannelExecutor[string] {
    return func(ctx context.Context, _ <-chan gloo.Row[string], out chan<- gloo.Row[string]) error {
        inputs := make([]<-chan gloo.Row[string], len(readers))
        for i, r := range readers {
            ch := make(chan Row[string], 100)
            inputs[i] = ch
            go ReaderToChannel(ctx, r, ch)
        }
        return ChannelMerge(inputs...)(ctx, nil, out)
    }
}

func ChannelStatefulLineTransform

func ChannelStatefulLineTransform[T any](fn ChannelStatefulLineTransformFunc[T]) gloo.ChannelCommand[T]

ChannelStatefulLineTransform creates a ChannelCommand that transforms rows with state tracking. Best for: uniq, nl, head - need row numbers or previous row tracking

Example with strings (simulating head):

func Head(n int) ChannelCommand[string] {
    return ChannelStatefulLineTransform(func(rowNum int64, line string) (string, bool) {
        return line, rowNum <= int64(n)
    })
}

Example with custom types:

type Event struct {
    ID        int
    Timestamp time.Time
}

func AddSequence() ChannelCommand[Event] {
    return ChannelStatefulLineTransform(func(rowNum int64, e Event) (Event, bool) {
        e.ID = int(rowNum)
        return e, true
    })
}

func ChannelTransform

func ChannelTransform[TIn, TOut any](fn func(TIn) (TOut, bool, error)) func(context.Context, <-chan gloo.Row[TIn], chan<- gloo.Row[TOut]) error

ChannelTransform allows transforming from one type to another in a pipeline. This is useful when you need to change the data type between pipeline stages.

Example: Parse log lines into structured data

type LogEntry struct {
    Level     string
    Timestamp time.Time
    Message   string
}

func ParseLogs() ChannelExecutor[string] {
    return ChannelTransform[string, LogEntry](
        func(line string) (LogEntry, bool, error) {
            // Parse line into LogEntry
            parts := strings.SplitN(line, " ", 3)
            if len(parts) != 3 {
                return LogEntry{}, false, nil
            }
            return LogEntry{
                Level:   parts[0],
                Message: parts[2],
            }, true, nil
        },
    )
}

func LineTransform

func LineTransform(fn LineTransformFunc) gloo.Command

LineTransform creates a gloo.Command that transforms lines one at a time. Best for: grep, cut, tr, sed - stateless line transformations

Example:

func (c command) Executor() gloo.CommandExecutor {
    return gloo.LineTransform(func(line string) (string, bool) {
        if strings.Contains(line, c.pattern) {
            return line, true
        }
        return "", false
    }).Executor()
}

func RawCommand

func RawCommand(fn gloo.CommandExecutor) gloo.Command

RawCommand wraps a raw CommandExecutor function. Best for: diff, paste, comm, find, ls - need full control over I/O

Example:

func (c command) Executor() gloo.CommandExecutor {
    return gloo.RawCommand(func(ctx context.Context, stdin io.Reader, stdout, stderr io.Writer) error {
        // Full control over I/O
        return nil
    }).Executor()
}

func StatefulLineTransform

func StatefulLineTransform(fn StatefulLineTransformFunc) gloo.Command

StatefulLineTransform creates a Command that transforms lines with state tracking. Best for: uniq, nl, head - need line numbers or previous line tracking

Example:

func (c command) Executor() gloo.CommandExecutor {
    return gloo.StatefulLineTransform(func(lineNum int64, line string) (string, bool) {
        if lineNum <= c.maxLines {
            return line, true
        }
        return "", false
    }).Executor()
}

Types

type AccumulateAndOutputFunc

type AccumulateAndOutputFunc func(lines []string, stdout io.Writer) error

AccumulateAndOutputFunc collects all lines, processes them, and outputs directly.

type AccumulateAndProcessFunc

type AccumulateAndProcessFunc func(lines []string) []string

AccumulateAndProcessFunc collects all lines, processes them, and returns the result.

type ChannelAccumulateAndOutputFunc

type ChannelAccumulateAndOutputFunc[T any] func(rows []T, out chan<- gloo.Row[T]) error

ChannelAccumulateAndOutputFunc collects all rows, processes them, and outputs directly to the channel.

type ChannelAccumulateAndProcessFunc

type ChannelAccumulateAndProcessFunc[T any] func(rows []T) []T

ChannelAccumulateAndProcessFunc collects all rows, processes them, and returns the result.

type ChannelLineTransformFunc

type ChannelLineTransformFunc[T any] func(data T) (output T, emit bool)

ChannelLineTransformFunc is a simple row-by-row transformation for channels. Return (output, true) to emit the row, or (_, false) to skip it.

type ChannelStatefulLineTransformFunc

type ChannelStatefulLineTransformFunc[T any] func(rowNum int64, data T) (output T, emit bool)

ChannelStatefulLineTransformFunc is a row transformation with row number tracking. Return (output, true) to emit the row, or (_, false) to skip it.

type LineTransformFunc

type LineTransformFunc func(line string) (output string, emit bool)

LineTransformFunc is a simple line-by-line transformation. Return (output, true) to emit the line, or (_, false) to skip it.

type StatefulLineTransformFunc

type StatefulLineTransformFunc func(lineNum int64, line string) (output string, emit bool)

StatefulLineTransformFunc is a line transformation with line number tracking. Return (output, true) to emit the line, or (_, false) to skip it.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL