chanx

package module
v1.3.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 13, 2025 License: MIT Imports: 3 Imported by: 1

README

永不阻塞, 无限缓存的 Channel

A never-blocking, infinitely buffered channel. forked from smallnest/chanx

Ref: 实现无限缓存的channel | 鸟窝 https://colobu.com/2021/05/11/unbounded-channel-in-go/

变动

  • 增加初始化可选参数 maxBufferSize
    • 用于限定 RingBuffer 容量上限, 将无限缓存变为永不阻塞的有限缓存队列, 超过限制丢弃数据
      • 不受 initBufCapacity 参数影响, RingBuffer 初始化容量最小为 2
      • 不包含 initInCapacityinitOutCapacity 的容量
      • 默认为 0 表示无限缓冲
  • 增加数据丢弃时回调方法: c.SetOnDiscards(func(T))
    • 触发 maxBufferSize 限制并丢弃数据时, 将数据传给回调方法处理(无限缓存无效)
  • 增加动态调整 RingBuffer 容量上限方法: c.SetMaxBufferSize(0)
    • 值 <= 0 时恢复为无限缓冲, 返回最终设定的值: 0 或有效上限值
  • 增加一些计数方法: c.BufCapacity() c.MaxBufferSize() c.Discards()

使用

  • Go 1.17.x or below: v1.2.2
  • Go generic: latest version
安装
go get github.com/fufuok/chanx
永不阻塞, 无限缓存的 Channel

Never Block, Infinitely Cached Channel

package main

import (
	"context"
	"fmt"

	"github.com/fufuok/chanx"
)

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	ch := chanx.NewUnboundedChan[int](ctx, 10)
	// or
	// ch := chanx.NewUnboundedChanSize[int](ctx, 10, 200, 1000)

	go func() {
		for i := 0; i < 100; i++ {
			ch.In <- i // send values
		}
		close(ch.In) // close In channel
	}()

	for v := range ch.Out { // read values
		fmt.Println(v + 1)
	}
}
永不阻塞, 带缓存上限的 Channel

Never block, Channel with buffer size limit

package main

import (
	"context"
	"fmt"

	"github.com/fufuok/chanx"
)

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	// 可选参数, 缓冲上限
	const maxBufCapacity = 10
	ch := chanx.NewUnboundedChan[int](ctx, 10, maxBufCapacity)
	// or
	// ch := chanx.NewUnboundedChanSize[int](ctx, 10, 10, 10, maxBufCapacity)

	// 有缓冲上限时, 可选设置数据丢弃时回调
	ch.SetOnDiscards(func(v int) {
		fmt.Println("discard: ", v)
	})

	go func() {
		for i := 0; i < 100; i++ {
			ch.In <- i // send values
		}
		close(ch.In) // close In channel
	}()

	for v := range ch.Out { // read values
		fmt.Println(v + 1)
	}
}

chanx

Unbounded chan with ringbuffer.

License GoDoc travis Go Report Card coveralls

Refer to the below articles and issues:

  1. https://github.com/golang/go/issues/20352
  2. https://stackoverflow.com/questions/41906146/why-go-channels-limit-the-buffer-size
  3. https://medium.com/capital-one-tech/building-an-unbounded-channel-in-go-789e175cd2cd
  4. https://erikwinter.nl/articles/2020/channel-with-infinite-buffer-in-golang/

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrIsEmpty = errors.New("ringbuffer is empty")

Functions

This section is empty.

Types

type RingBuffer

type RingBuffer[T any] struct {
	// contains filtered or unexported fields
}

RingBuffer is a ring buffer for common types. It never is full and always grows if it will be full. It is not thread-safe(goroutine-safe) so you must use the lock-like synchronization primitive to use it in multiple writers and multiple readers.

func NewRingBuffer

func NewRingBuffer[T any](initialSize int) *RingBuffer[T]

func (*RingBuffer[T]) Capacity

func (r *RingBuffer[T]) Capacity() int

Capacity returns the size of the underlying buffer.

func (*RingBuffer[T]) IsEmpty

func (r *RingBuffer[T]) IsEmpty() bool

func (*RingBuffer[T]) Len

func (r *RingBuffer[T]) Len() int

func (*RingBuffer[T]) Peek

func (r *RingBuffer[T]) Peek() T

func (*RingBuffer[T]) Pop

func (r *RingBuffer[T]) Pop() T

func (*RingBuffer[T]) Read

func (r *RingBuffer[T]) Read() (T, error)

func (*RingBuffer[T]) Reset

func (r *RingBuffer[T]) Reset()

func (*RingBuffer[T]) Write

func (r *RingBuffer[T]) Write(v T)

type UnboundedChan

type UnboundedChan[T any] struct {
	In  chan<- T // channel for write
	Out <-chan T // channel for read
	// contains filtered or unexported fields
}

UnboundedChan is an unbounded chan. In is used to write without blocking, which supports multiple writers. and Out is used to read, which supports multiple readers. You can close the in channel if you want.

func NewUnboundedChan

func NewUnboundedChan[T any](ctx context.Context, initCapacity int, maxBufferSize ...int) *UnboundedChan[T]

NewUnboundedChan creates the unbounded chan. in is used to write without blocking, which supports multiple writers. and out is used to read, which supports multiple readers. You can close the in channel if you want.

func NewUnboundedChanSize added in v0.0.103

func NewUnboundedChanSize[T any](ctx context.Context, initInCapacity, initOutCapacity, initBufCapacity int, maxBufferSize ...int) *UnboundedChan[T]

NewUnboundedChanSize is like NewUnboundedChan but you can set initial capacity for In, Out, Buffer. and max buffer capactiy.

func (*UnboundedChan[T]) BufCapacity added in v0.0.104

func (c *UnboundedChan[T]) BufCapacity() int

BufCapacity returns capacity of the buffer.

func (*UnboundedChan[T]) BufLen

func (c *UnboundedChan[T]) BufLen() int

BufLen returns len of the buffer. It is not accurate and only for your evaluating approximate number of elements in this chan, see https://github.com/smallnest/chanx/issues/7.

func (*UnboundedChan[T]) Discards added in v0.0.104

func (c *UnboundedChan[T]) Discards() uint64

Discards returns the number of discards.

func (*UnboundedChan[T]) Len

func (c *UnboundedChan[T]) Len() int

Len returns len of In plus len of Out plus len of buffer. It is not accurate and only for your evaluating approximate number of elements in this chan, see https://github.com/smallnest/chanx/issues/7.

func (*UnboundedChan[T]) MaxBufferSize added in v1.1.1

func (c *UnboundedChan[T]) MaxBufferSize() int

MaxBufferSize returns maximum capacity of the buffer.

func (*UnboundedChan[T]) SetMaxBufferSize added in v1.1.1

func (c *UnboundedChan[T]) SetMaxBufferSize(n int) int

SetMaxBufferSize reset the maximum capacity of buffer 0 or negative means unlimited

func (*UnboundedChan[T]) SetOnDiscards added in v1.1.0

func (c *UnboundedChan[T]) SetOnDiscards(fn func(T))

SetOnDiscards set the callback function when data is discarded

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL