flow

package module
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 22, 2025 License: MIT Imports: 2 Imported by: 2

README

Flow Library

Overview

The Flow library provides a thread-safe implementation for managing asynchronous data flows. It includes several implementations, such as Stream, Repeater, Never, and Empty, each with different behaviors.

Installation

Include the library in your Go project:

go get github.com/bobcatalyst/flow

Implementations

Stream

A Stream represents a continuous flow of values that can be closed, pushed to, and listened to.

Example Usage
var s flow.Stream[int]
s.Push(1, 2, 3)
c := s.Listen(context.Background())
s.Push(4, 5, 6)
for val := range c {
    fmt.Println(val)
}
Output:
4
5
6
Repeater

A Repeater repeats past values.

Example Usage
var r flow.Repeater[int]
r.Push(1, 2, 3)
c := s.Listen(context.Background())
r.Push(4, 5, 6)
for val := range c {
    fmt.Println(val)
}
Output:
1
2
3
4
5
6
Never

A Never flow never emits any values and only closes when the context is canceled.

Example Usage
var n flow.Never[int]
c := n.Listen(context.Background())
// Will block until the context is canceled.
Empty

An Empty flow immediately closes without emitting any values.

Example Usage
var e flow.Empty[int]
c := e.Listen(context.Background())
// Channel `c` is already closed.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Closable

type Closable[T any] interface {
	// Close pushes final values (if any) and marks the flow as closed.
	// After Close is called, all channels produced by Listenable.Listen will close after emitting the final values.
	// Once closed, Resettable.Reset will no longer have any effect.
	// Close can be safely called multiple times, but only the first invocation will push values and close the flow.
	Close(...T)
}

type Empty

type Empty[T any] struct{}

Empty represents a Flow that immediately closes without emitting any values.

func (Empty[T]) Listen

func (Empty[T]) Listen(context.Context) <-chan T

type Flow

type Flow[T any] interface {
	Listenable[T]
	Pushable[T]
	Closable[T]
	Resettable[T]
}

Flow is an interface that combines Listenable, Pushable, Closable, and Resettable.

type Listenable

type Listenable[T any] interface {
	// Listen returns a channel that emits values pushed into the flow.
	// The channel closes when either the Closable.Close method is called or the provided context is cancelled.
	// The values received by the channel will be in the same order they were pushed.
	Listen(ctx context.Context) <-chan T
}

type Never

type Never[T any] struct{}

Never represents a Flow that never emits any value and closes only when the Listen context is cancelled.

func (Never[T]) Listen

func (Never[T]) Listen(ctx context.Context) <-chan T

type Pushable

type Pushable[T any] interface {
	// Push allows multiple values to be pushed into the flow.
	// This operation is thread-safe and does not block the caller.
	Push(...T)
}

type Repeater

type Repeater[T any] struct {
	// contains filtered or unexported fields
}

Repeater is a Flow implementation that repeats values until reset.

func (*Repeater[T]) Close

func (r *Repeater[T]) Close(value ...T)

Close finalizes the flow by pushing the provided values and marking it as closed.

func (*Repeater[T]) Listen

func (r *Repeater[T]) Listen(ctx context.Context) (c <-chan T)

Listen returns a channel that emits values starting from the current reference node. The channel will close when the context is canceled or the flow is closed.

func (*Repeater[T]) Push

func (r *Repeater[T]) Push(value ...T)

Push adds the provided values into the flow.

func (*Repeater[T]) Reset

func (r *Repeater[T]) Reset(value ...T)

Reset clears the flow's current state and initializes it with the provided values.

type Resettable

type Resettable[T any] interface {
	// Reset clears the flow and pushes new values into it.
	// If the flow is already closed (via Closable.Close), Reset will have no effect.
	Reset(...T)
}

type Stream

type Stream[T any] struct {
	// contains filtered or unexported fields
}

Stream is a Flow implementation that emits ordered values until closed.

func (*Stream[T]) Close

func (s *Stream[T]) Close(value ...T)

Close finalizes the flow by pushing the provided values and marking it as closed.

func (*Stream[T]) Listen

func (s *Stream[T]) Listen(ctx context.Context) (c <-chan T)

Listen returns a channel that emits values from the head of the flow. The channel closes when the context is canceled or the flow is closed.

func (*Stream[T]) Push

func (s *Stream[T]) Push(value ...T)

Push adds the provided values into the flow.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL