stream

package
v0.35.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 29, 2025 License: MIT Imports: 5 Imported by: 0

Documentation

Overview

Package stream 提供泛型流处理能力 支持类型安全的流操作,包括复制、合并和转换

Index

Constants

This section is empty.

Variables

View Source
var ErrRecvAfterClosed = errors.New("recv after stream closed")

ErrRecvAfterClosed 表示在流关闭后调用了 Recv

View Source
var ErrSkip = errors.New("skip this value")

ErrSkip 用于在 Transform 过程中跳过某个值 示例:

outStream := Transform(inStream, func(s string) (string, error) {
    if len(s) == 0 {
        return "", ErrSkip
    }
    return s, nil
})

Functions

func ForEach

func ForEach[T any](r *Reader[T], fn func(T) error) error

ForEach 消费读取器中的所有值,对每个值调用 fn

func GetSourceName

func GetSourceName(err error) (string, bool)

GetSourceName 从 SourceEOF 错误中提取源流名称

func Pipe

func Pipe[T any](cap int) (*Reader[T], *Writer[T])

Pipe 创建具有指定缓冲容量的新流 返回用于接收的 Reader 和用于发送的 Writer

示例:

reader, writer := Pipe[string](10)
go func() {
    defer writer.Close()
    writer.Send("hello", nil)
    writer.Send("world", nil)
}()

for {
    v, err := reader.Recv()
    if errors.Is(err, io.EOF) {
        break
    }
    fmt.Println(v)
}

Types

type Reader

type Reader[T any] struct {
	// contains filtered or unexported fields
}

Reader 是流的接收端

func Filter

func Filter[T any](r *Reader[T], predicate func(T) bool) *Reader[T]

Filter 创建只包含匹配谓词的值的新读取器

func FromSlice

func FromSlice[T any](slice []T) *Reader[T]

FromSlice 从切片创建 Reader

func Map

func Map[T, U any](r *Reader[T], fn func(T) U) *Reader[U]

Map 对流中的每个值应用函数

func Merge

func Merge[T any](readers ...*Reader[T]) *Reader[T]

Merge 将多个读取器合并为一个 从任何有数据的读取器接收值

func MergeNamed

func MergeNamed[T any](readers map[string]*Reader[T]) *Reader[T]

MergeNamed 将多个命名读取器合并为一个 当源流结束时,返回带有流名称的 SourceEOF

func Take

func Take[T any](r *Reader[T], n int) *Reader[T]

Take 返回只产出前 n 个值的读取器

func Transform

func Transform[T, U any](r *Reader[T], fn func(T) (U, error)) *Reader[U]

Transform 将类型 T 的流转换为类型 U 返回 ErrSkip 可跳过某个值

func TransformSimple

func TransformSimple[T, U any](r *Reader[T], fn func(T) (U, error)) *Reader[U]

TransformSimple 创建新的 goroutine 进行转换

func (*Reader[T]) Close

func (r *Reader[T]) Close()

Close 关闭读取器并释放资源

func (*Reader[T]) Collect

func (r *Reader[T]) Collect() ([]T, error)

Collect 将流中所有值读取到切片中

func (*Reader[T]) Copy

func (r *Reader[T]) Copy(n int) []*Reader[T]

Copy 创建 n 个独立的读取器 调用 Copy 后原读取器不可用 每个副本可以独立读取,互不影响

func (*Reader[T]) Recv

func (r *Reader[T]) Recv() (T, error)

Recv 从流中接收下一个值 当流耗尽时返回 io.EOF

func (*Reader[T]) SetAutomaticClose

func (r *Reader[T]) SetAutomaticClose()

SetAutomaticClose 启用 GC 时自动清理

type SourceEOF

type SourceEOF struct {
	Name string
}

SourceEOF 表示合并流中某个源流的 EOF

func (*SourceEOF) Error

func (e *SourceEOF) Error() string

type Writer

type Writer[T any] struct {
	// contains filtered or unexported fields
}

Writer 是流的发送端

func (*Writer[T]) Close

func (w *Writer[T]) Close()

Close 关闭写入器,向接收端发送 EOF 信号

func (*Writer[T]) Send

func (w *Writer[T]) Send(value T, err error) bool

Send 向流发送一个值 如果流已被接收端关闭,返回 true

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL