demux

package module
v0.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 19, 2025 License: MIT Imports: 3 Imported by: 0

README

demux

CI Go Report Card Go Reference

demux is a lightweight Go package that provides flexible and generic demultiplexing (fan-out) utilities for channels. It allows you to route items from a single input channel to multiple output channels based on dynamic or static keys.

Features

  • Dynamic Demuxing: Automatically spawn goroutines for each unique key, with dedicated channels.
  • Static Demuxing: Route messages to pre-defined channels based on their keys.
  • Generic: Uses Go generics for maximum flexibility.

Installation

go get github.com/floatdrop/demux

Usage

Dynamic Demuxing

Dynamic demuxes messages from an input channel to a dynamically created set of output channels, one per key. Each unique key launches a dedicated goroutine running consumeFunc.

demux.Dynamic(input, func(msg MyType) string {
    return msg.UserID // or any key
}, func(key string, ch <-chan MyType) {
    for msg := range ch { // start consuming messages with same UserID
        fmt.Printf("Consumer for %s got: %+v\n", key, msg)
    }
})
Static Demuxing

Static demuxes messages based on a key and routes them to pre-defined channels in a map.

channels := map[string]chan MyType{
    "alpha": make(chan MyType),
    "beta":  make(chan MyType),
}

go demux.Static(input, func(msg MyType) string {
    return msg.Group
}, channels)

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

License

This project is licensed under the MIT License - see the LICENSE file for details.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Dynamic

func Dynamic[T any, K comparable](
	in <-chan T,
	keyFunc func(T) K,
	consumeFunc func(K, <-chan T),
	opts ...Option,
)

Dynamic creates dynamic demultiplexer that routes items from 'in' based on keys returned by 'keyFunc'. For each unique key, a new goroutine is spawned running 'consumeFunc'. Each consumeFunc receives a channel that delivers values matching its key. When maxChannels limit is reached, least recently used channels are evicted.

func Static

func Static[T any, K comparable](in <-chan T, keyFunc func(T) K, channels map[K]chan<- T)

Static creates Static demultipler that routes each element to channel in channels map by key computed by keyFunc.

Types

type Config added in v0.0.3

type Config struct {
	// contains filtered or unexported fields
}

Config holds configuration for the dynamic demultiplexer

type Option added in v0.0.3

type Option func(*Config)

Option is a functional option for configuring the demultiplexer

func WithBufferSize added in v0.0.3

func WithBufferSize(size int) Option

WithBufferSize sets the buffer size for each channel

func WithMaxChannels added in v0.0.3

func WithMaxChannels(max int) Option

WithMaxChannels sets the maximum number of concurrent channels

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL