pipelines

package module
v1.0.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 20, 2025 License: MIT Imports: 2 Imported by: 0

README

Go Reference Go Report Card Go Coverage

pipelines

Pipelines contains generic functions that help with concurrent processing

Usage

A pipeline can be created from a slice or map

stream := pipelines.StreamSlice(ctx, data)

Or from a generator function

func GenerateData(ctx context.Context) int { return rand.Intn(10) }

stream := pipelines.GenerateStream(ctx, GenerateData)
FanOut

FanOut can be used to process data concurrently. Useful for I/O bound processes, but it can be used in any situation where you have a slice or map of data and want to introduce concurrency

const MaxFan int = 3

fanOutChannels := pipelines.FanOut(ctx, stream, ProcessFunc, MaxFan)
FanIn

FanIn can be used to merge data into one channel

fanInData := pipelines.FanIn(ctx, fanOutChannels...)

Example

package main

import (
    "context"
    "fmt"
    "math/rand"
    "os"
    "os/signal"
    "strconv"
    "time"

    "github.com/nxdir-s/pipelines"
)

const (
    MaxFan int = 3
)

func GenerateData(ctx context.Context) int {
    return rand.Intn(5)
}

func Process(ctx context.Context, timeout int) string {
    select {
    case <-ctx.Done():
        return "context cancelled"
    case <-time.After(time.Second * time.Duration(timeout)):
        return "slept for " + strconv.Itoa(timeout) + " seconds!"
    }
}

func Read(ctx context.Context, messages <-chan string) {
    for msg := range messages {
        select {
        case <-ctx.Done():
            return
        default:
            fmt.Fprintf(os.Stdout, "%s\n", msg)
        }
    }
}

func main() {
    ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
    defer cancel()

    stream := pipelines.GenerateStream(ctx, GenerateData)
    fanOutChannels := pipelines.FanOut(ctx, stream, Process, MaxFan)
    messages := pipelines.FanIn(ctx, fanOutChannels...)

    go Read(ctx, messages)

    select {
    case <-ctx.Done():
        fmt.Fprint(os.Stdout, "context canceled, exiting...\n")
        os.Exit(0)
    }
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func FanIn

func FanIn[T any](ctx context.Context, channels ...<-chan T) <-chan T

FanIn takes any number of readonly channels and returns a fanned in channel

func FanOut

func FanOut[T any, H any](ctx context.Context, inputStream <-chan T, fn func(context.Context, T) H, numFan int) []<-chan H

FanOut controls concurrent processing of data from the input channel

func GenerateStream

func GenerateStream[T any](ctx context.Context, fn func(context.Context) T) <-chan T

GenerateStream takes a function that generates data and returns a channel of type T

func StreamMap

func StreamMap[T comparable, H comparable](ctx context.Context, data map[T]H) <-chan H

StreamMap takes a map of type map[T]H and returns a channel of type H

func StreamSlice

func StreamSlice[T any](ctx context.Context, data []T) <-chan T

StreamSlice takes a slice of type []T and returns a channel of type T

Types

This section is empty.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL