channel

package module
v0.0.0-...-52f42c3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 26, 2022 License: MIT Imports: 2 Imported by: 0

README

Package dexm.lol/channel

Tests Go Reference

Helpers for easier work with channels.

WARNING: This package is work in progress, do not use it yet ⚠

Installation

go get dexm.lol/channel

Documentation

Documentation can be found at Golang packages website

Documentation

Overview

Package channel provides helpers for easier work with channels.

Example
package main

import (
	"context"
	"fmt"

	"dexm.lol/channel"
)

func main() {
	ctx, cancel := context.WithCancel(context.TODO())
	defer cancel()

	type processInput struct {
		message           string
		shouldFailProcess bool
		shouldFailConsume bool
	}

	type processOutput struct {
		message           string
		shouldFailConsume bool
	}

	chIn := make(chan processInput, 3)
	chIn <- processInput{message: "message 1", shouldFailProcess: false, shouldFailConsume: false}
	chIn <- processInput{message: "message 2", shouldFailProcess: false, shouldFailConsume: true}
	chIn <- processInput{message: "message 3", shouldFailProcess: true, shouldFailConsume: true}
	close(chIn)

	chProcessRes, chProcessErr := channel.Process(ctx, 2, chIn, func(ctx context.Context, in processInput) (processOutput, error) {
		if in.shouldFailProcess {
			return processOutput{}, fmt.Errorf("error processing message: %s", in.message)
		}

		res := processOutput{
			message:           fmt.Sprintf("processed message: %s", in.message),
			shouldFailConsume: in.shouldFailConsume,
		}
		return res, nil
	})

	chConsumeErr := channel.Consume(ctx, 2, chProcessRes, func(ctx context.Context, in processOutput) error {
		if in.shouldFailConsume {
			return fmt.Errorf("error consuming message: %s", in.message)
		}

		fmt.Println("Consumed message:", in.message)
		return nil
	})

	chErr := channel.Merge(ctx, chProcessErr, chConsumeErr)

	for err := range chErr {
		fmt.Println("Error received:", err.Error())
	}

}
Output:

Consumed message: processed message: message 1
Error received: error consuming message: processed message: message 2
Error received: error processing message: message 3

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func Consume

func Consume[T any](
	ctx context.Context,
	concurrency int,
	channel <-chan T,
	f func(context.Context, T) error,
) <-chan error

Consume channel concurrently. Concurrency must be greater than 0, but it makes no sense to have it less than 2. You must close input channel for error channel to be closed.

Example
package main

import (
	"context"
	"fmt"

	"dexm.lol/channel"
)

func main() {
	ctx, cancel := context.WithCancel(context.TODO())
	defer cancel()

	type input struct {
		message    string
		shouldFail bool
	}

	chIn := make(chan input, 2)
	chIn <- input{message: "message 1", shouldFail: false}
	chIn <- input{message: "message 2", shouldFail: true}
	close(chIn)

	chErr := channel.Consume(ctx, 2, chIn, func(ctx context.Context, in input) error {
		if in.shouldFail {
			return fmt.Errorf("error consuming message: %s", in.message)
		}

		fmt.Println("Consumed message:", in.message)
		return nil
	})

	for err := range chErr {
		fmt.Println("Error received:", err.Error())
	}

}
Output:

Consumed message: message 1
Error received: error consuming message: message 2

func Merge

func Merge[T any](ctx context.Context, channels ...<-chan T) <-chan T

Merge multiple channels into a single one.

Example
package main

import (
	"context"
	"fmt"

	"dexm.lol/channel"
)

func main() {
	ctx, cancel := context.WithCancel(context.TODO())
	defer cancel()

	ch1 := make(chan string, 2)
	ch1 <- "message 1"
	ch1 <- "message 2"
	close(ch1)

	ch2 := make(chan string, 2)
	ch2 <- "message 3"
	ch2 <- "message 4"
	close(ch2)

	ch3 := channel.Merge(ctx, ch1, ch2)

	for message := range ch3 {
		fmt.Println("Received message:", message)
	}

}
Output:

Received message: message 1
Received message: message 2
Received message: message 3
Received message: message 4

func Process

func Process[T, R any](
	ctx context.Context,
	concurrency int,
	channel <-chan T,
	f func(context.Context, T) (R, error),
) (<-chan R, <-chan error)

Process channel concurrently. Concurrency must be greater than 0, but it makes no sense to have it less than 2. You must close input channel for output and error channels to be closed.

Example
package main

import (
	"context"
	"fmt"

	"dexm.lol/channel"
)

func main() {
	ctx, cancel := context.WithCancel(context.TODO())
	defer cancel()

	type input struct {
		message    string
		shouldFail bool
	}

	chIn := make(chan input, 2)
	chIn <- input{message: "message 1", shouldFail: false}
	chIn <- input{message: "message 2", shouldFail: true}
	close(chIn)

	chRes, chErr := channel.Process(ctx, 2, chIn, func(ctx context.Context, in input) (string, error) {
		if in.shouldFail {
			return "", fmt.Errorf("error processing message: %s", in.message)
		}
		return fmt.Sprintf("processed message: %s", in.message), nil
	})

	res := <-chRes
	fmt.Println("Result received:", res)

	err := <-chErr
	fmt.Println("Error received:", err.Error())

}
Output:

Result received: processed message: message 1
Error received: error processing message: message 2

Types

This section is empty.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL