Documentation
¶
Overview ¶
Package channel provides helpers for easier work with channels.
Example ¶
package main
import (
"context"
"fmt"
"dexm.lol/channel"
)
func main() {
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
type processInput struct {
message string
shouldFailProcess bool
shouldFailConsume bool
}
type processOutput struct {
message string
shouldFailConsume bool
}
chIn := make(chan processInput, 3)
chIn <- processInput{message: "message 1", shouldFailProcess: false, shouldFailConsume: false}
chIn <- processInput{message: "message 2", shouldFailProcess: false, shouldFailConsume: true}
chIn <- processInput{message: "message 3", shouldFailProcess: true, shouldFailConsume: true}
close(chIn)
chProcessRes, chProcessErr := channel.Process(ctx, 2, chIn, func(ctx context.Context, in processInput) (processOutput, error) {
if in.shouldFailProcess {
return processOutput{}, fmt.Errorf("error processing message: %s", in.message)
}
res := processOutput{
message: fmt.Sprintf("processed message: %s", in.message),
shouldFailConsume: in.shouldFailConsume,
}
return res, nil
})
chConsumeErr := channel.Consume(ctx, 2, chProcessRes, func(ctx context.Context, in processOutput) error {
if in.shouldFailConsume {
return fmt.Errorf("error consuming message: %s", in.message)
}
fmt.Println("Consumed message:", in.message)
return nil
})
chErr := channel.Merge(ctx, chProcessErr, chConsumeErr)
for err := range chErr {
fmt.Println("Error received:", err.Error())
}
}
Output: Consumed message: processed message: message 1 Error received: error consuming message: processed message: message 2 Error received: error processing message: message 3
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Consume ¶
func Consume[T any]( ctx context.Context, concurrency int, channel <-chan T, f func(context.Context, T) error, ) <-chan error
Consume channel concurrently. Concurrency must be greater than 0, but it makes no sense to have it less than 2. You must close input channel for error channel to be closed.
Example ¶
package main
import (
"context"
"fmt"
"dexm.lol/channel"
)
func main() {
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
type input struct {
message string
shouldFail bool
}
chIn := make(chan input, 2)
chIn <- input{message: "message 1", shouldFail: false}
chIn <- input{message: "message 2", shouldFail: true}
close(chIn)
chErr := channel.Consume(ctx, 2, chIn, func(ctx context.Context, in input) error {
if in.shouldFail {
return fmt.Errorf("error consuming message: %s", in.message)
}
fmt.Println("Consumed message:", in.message)
return nil
})
for err := range chErr {
fmt.Println("Error received:", err.Error())
}
}
Output: Consumed message: message 1 Error received: error consuming message: message 2
func Merge ¶
Merge multiple channels into a single one.
Example ¶
package main
import (
"context"
"fmt"
"dexm.lol/channel"
)
func main() {
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
ch1 := make(chan string, 2)
ch1 <- "message 1"
ch1 <- "message 2"
close(ch1)
ch2 := make(chan string, 2)
ch2 <- "message 3"
ch2 <- "message 4"
close(ch2)
ch3 := channel.Merge(ctx, ch1, ch2)
for message := range ch3 {
fmt.Println("Received message:", message)
}
}
Output: Received message: message 1 Received message: message 2 Received message: message 3 Received message: message 4
func Process ¶
func Process[T, R any]( ctx context.Context, concurrency int, channel <-chan T, f func(context.Context, T) (R, error), ) (<-chan R, <-chan error)
Process channel concurrently. Concurrency must be greater than 0, but it makes no sense to have it less than 2. You must close input channel for output and error channels to be closed.
Example ¶
package main
import (
"context"
"fmt"
"dexm.lol/channel"
)
func main() {
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
type input struct {
message string
shouldFail bool
}
chIn := make(chan input, 2)
chIn <- input{message: "message 1", shouldFail: false}
chIn <- input{message: "message 2", shouldFail: true}
close(chIn)
chRes, chErr := channel.Process(ctx, 2, chIn, func(ctx context.Context, in input) (string, error) {
if in.shouldFail {
return "", fmt.Errorf("error processing message: %s", in.message)
}
return fmt.Sprintf("processed message: %s", in.message), nil
})
res := <-chRes
fmt.Println("Result received:", res)
err := <-chErr
fmt.Println("Error received:", err.Error())
}
Output: Result received: processed message: message 1 Error received: error processing message: message 2
Types ¶
This section is empty.
Click to show internal directories.
Click to hide internal directories.