flow

package module
v0.9.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 20, 2026 License: Apache-2.0 Imports: 1 Imported by: 0

README

flow

Go Reference Go Report Card Go Coverage

A Go package for building concurrent data pipelines. You provide plain functions — the package handles all the channel wiring, goroutine management, and synchronization.

Install

go get github.com/mycreepy/flow

Usage

Task

A Task is the core building block — a function that reads from an input channel and writes to an output channel. Tasks must close the out channel when done, including when returning an error. Returning a non-nil error signals that processing has failed; closing the out channel ensures downstream tasks and goroutines terminate cleanly.

double := flow.Task[int, int](func(in <-chan int, out chan<- int) error {
    defer close(out)
	for v := range in {
        out <- v * 2
    }
    return nil
})

The generic type definition of Task can be inferred most of the time as seen in the next examples.

Concurrent

Fan out a task across multiple goroutines:

heavy := flow.Concurrent(4, func(in <-chan int, out chan<- int) error {
    defer close(out)
	for v := range in {
        time.Sleep(time.Second)
        out <- v * 2
    }
    return nil
})

results, err := flow.FromValues(heavy, 1, 2, 3, 4) // processed by 4 workers concurrently
Chain & Pipe

When all tasks share the same type, use Chain:

pipeline := flow.Chain(
    func(in <-chan int, out chan<- int) error {
        defer close(out)
        for v := range in { out <- v + 1 }
        return nil
    },
    func(in <-chan int, out chan<- int) error {
        defer close(out)
		for v := range in { out <- v * 2 }
        return nil
    },
)

results, err := flow.FromValues(pipeline, 10) // [22]

Use Pipe to connect tasks with different input/output types:

double := func(in <-chan int, out chan<- int) error {
    defer close(out)
	for v := range in { out <- v * 2 }
    return nil
}

toString := func(in <-chan int, out chan<- string) error {
    defer close(out)
	for v := range in { out <- fmt.Sprintf("%d", v) }
    return nil
}

pipeline := flow.Pipe(double, toString)
results, err := flow.FromValues(pipeline, 1, 2, 3) // ["2", "4", "6"]

For longer chains, nest Pipe calls:

pipeline := flow.Pipe(flow.Pipe(parse, transform), encode)
Helper functions

Filter items in a pipeline with Filter:

pipeline := flow.Filter(func(v int) (bool, error) { return v%2 == 0, nil })

results, err := flow.FromValues(pipeline, 1, 2, 3, 4, 5, 6) // [2, 4, 6]

Apply a function to each item with ForEach:

pipeline := flow.ForEach(func(v int) (string, error) { return fmt.Sprintf("%d", v), nil })

results, err := flow.FromValues(pipeline, 1, 2, 3) // ["1", "2", "3"]

Duplicate items to a slice with Append:

var collected []int
pipeline := flow.Append(&collected)

results, err := flow.FromValues(pipeline, 1, 2, 3) // [1, 2, 3]
// collected == [1, 2, 3]

Duplicate items to a side channel with Tee:

tee := make(chan int, 100)
pipeline := flow.Tee(tee)

go func() {
	for v := range tee {
		fmt.Println(v) // 1, 2, 3
    }
}
}()

results, err := flow.FromValues(pipeline, 1, 2, 3) // [1, 2, 3]
close(tee)

Log progress every N items with LogEveryN:

pipeline := flow.LogEveryN[int](1000, slog.Default(), "processed items")

results, err := flow.FromSlice(pipeline, veryBigSlice)
Error handling

When a task returns an error, it must still close out (use defer close(out)). This ensures all downstream goroutines drain and terminate — no goroutine leaks. The error is propagated through Pipe, Chain, Concurrent, and returned by FromValues, FromSlice & FromChannel.

pipeline := flow.Pipe(
    func(in <-chan int, out chan<- int) error {
        defer close(out)
        for v := range in {
            if v < 0 {
                return errors.New("negative value")
            }
            out <- v
        }
        return nil
    },
    func(in <-chan int, out chan<- string) error {
        defer close(out)
        for v := range in { out <- fmt.Sprintf("%d", v) }
        return nil
    },
)

results, err := flow.FromValues(pipeline, 1, -1, 3)
if err != nil {
    slog.Error("data pipeline failed", "error", err) // "negative value"
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func FromChannel added in v0.3.0

func FromChannel[In, Out any](t Task[In, Out], in <-chan In) ([]Out, error)

FromChannel starts the task with values from a channel and returns the collected output. Returns a non-nil error if the task fails.

func FromSlice added in v0.5.0

func FromSlice[In, Out any](t Task[In, Out], input []In) ([]Out, error)

FromSlice starts the task with the given input slice and returns the collected output. Returns a non-nil error if the task fails.

func FromValues added in v0.3.0

func FromValues[In, Out any](t Task[In, Out], input ...In) ([]Out, error)

FromValues starts the task with the given input values and returns the collected output. Returns a non-nil error if the task fails.

Types

type Logger added in v0.6.0

type Logger interface {
	Info(msg string, args ...any)
}

Logger is the interface used by logging helpers to emit structured log messages.

type Task

type Task[In, Out any] func(in <-chan In, out chan<- Out) error

Task processes values from in and sends results to out. Must close out when done, including when returning an error. Returning a non-nil error signals that processing has failed; closing the out channel ensures downstream tasks and goroutines terminate cleanly.

func Append added in v0.8.0

func Append[T any](s *[]T) Task[T, T]

Append returns a Task that appends all items to the given slice.

func Chain

func Chain[T any](tasks ...Task[T, T]) Task[T, T]

Chain composes multiple same-type tasks into a single task. If any task in the chain returns an error, it is propagated to the caller.

func Concurrent

func Concurrent[In, Out any](n int, t Task[In, Out]) Task[In, Out]

Concurrent runs n goroutines of the given task sharing the same in/out channels. If any worker returns an error, the first non-nil error is returned.

func Filter added in v0.6.0

func Filter[T any](predicate func(T) (bool, error)) Task[T, T]

Filter returns a Task that only forwards items for which predicate returns true. The predicate may also return an error, which will be returned by the Task to abort the pipeline.

func ForEach added in v0.7.0

func ForEach[In, Out any](fn func(in In) (Out, error)) Task[In, Out]

ForEach returns a Task that calls fn for each item and forwards the result. The function may also return an error, which will be returned by the Task to abort the pipeline.

func LogEveryN added in v0.6.0

func LogEveryN[T any](n int, logger Logger, msg string, args ...any) Task[T, T]

LogEveryN returns a pass-through Task that logs a message every n items processed.

func Pipe

func Pipe[A, B, C any](a Task[A, B], b Task[B, C]) Task[A, C]

Pipe connects two tasks: the output of a feeds into b. If either task returns an error, it is propagated to the caller.

func Tee added in v0.8.0

func Tee[T any](tee chan<- T) Task[T, T]

Tee returns a Task that forwards all items to the given channel and the output channel.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL