stream

package module
v0.0.0-...-f979d32 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 26, 2024 License: Apache-2.0 Imports: 6 Imported by: 3

README

Reactive streams for Go

A reactive streams library for Go in the spirit of Reactive Extensions (Rx) implemented with generic functions. The library provides a rich set of utilities for wiring event-passing in a complex application. Included are, for example, operators for pubsub/fanning out (Multicast), for transforming (Map, Reduce), for rate limiting (Throttle) and for buffering/coalescing (Buffer). New operators are easy to add as they are normal top-level functions that take/return the Observable type.

The Observable

The stream package provides the Observable interface for observing a stream of values that can be cancelled and can be either infinite or finite in length.

The Observable interface is defined as:

type Observable[T any] interface {
	Observe(ctx context.Context, next func(T), complete func(error))
}

The next function is called for each element in the stream. When the stream is terminated or cancelled (via ctx) next will be called for remaining elements and then complete after which neither function is invoked.

An Observable must adhere to the following rules:

  • Observe() call must not block, e.g. be asynchronous by forking a goroutine.
  • next must be called sequentially and never in parallel (previous call must complete before next can be called again).
  • complete can be called at most once. complete must not be called in parallel with next. After complete is called neither next nor complete can be called again.
  • if ctx is completed, calls to next should stop in short amount of time and complete must be called with ctx.Err().

Operators

The functions that operate on Observable[T] are divided into:

Since Go's generics does not yet allow new type parameters in methods, all of these are implemented as top-level functions rather than methods in the Observable interface. This also makes it easy to add new operators as they're just normal functions.

Creating an observable by hand

As a first example, we'll implement a simple source Observable that emits a single integer:


type singleIntegerObservable int

func (num singleIntegerObservable) Observe(ctx context.Context, next func(int), complete func(error)) {
	go func() {
		next(int(num))
		complete(nil)
	}()
}

We can now try it out with the Map operator:

func main() {
	var ten stream.Observable[int] = singleIntegerObservable(10)

	twenty := stream.Map(ten, func(x int) int) { return x * 2 })

	twenty.Observe(
		context.Background(),
		func(x int) {
			fmt.Printf("%d\n", x)
		},
		func(err error) {
			fmt.Printf("complete: %s\n", err)
		},
	)
}

Instead of defining a new type every time we want to implement Observe, we can use the FuncObservable helper:

func singleInt(x int) stream.Observable[int] {
	return stream.FuncObservable(
		func(ctx context.Context, next func(int), complete func(error)) error {
			next(x)
			complete(nil)
		},
	)
}

Tour of the included operators

Sources provide different ways of creating Observables without having to implement Observe:

Just(10)                   // emits 10 and completes
Error(errors.New("oh no")) // completes with error
Empty()                    // completes with nil error
FromSlice([]int{1,2,3})    // emits 1,2,3 and completes
FromChannel(in)            // emits items from the given channel
Range(0,3)                 // emits 0,1,2 and completes


// Multicast creates an observable that emits items to all observers.
src, next, complete := Multicast[int]()

ch1 := ToChannel(ctx, src)
ch2 := ToChannel(ctx, src)
next(1)
<-ch1 // 1
<-ch2 // 1

Operators transform streams in different ways:

// Map[A, B any](src Observable[A], apply func(A) B) Observable[B]
Map(src, apply)            // applies function 'apply' to each item.

// Filter[T any](src Observable[T], filter func(T) bool) Observable[T]
Filter(src, filter)        // applies function 'filter' to each item. If 'filter' returns false the
                           // item is dropped.

// Reduce[T, Result any](src Observable[T], init Result, reduce func(T, Result) Result) Observable[Result]
// Applies function 'reduce' to each item to "reduce" the stream into a single value.
Reduce(Range(0, 3), 0, func(x, result int) int { return x + result }) // 0 + 1 + 2 = 3

// ToMulticast[T any](src Observable[T], opts ...MulticastOpt) (mcast Observable[T], connect func(context.Context))
// Converts an observable into a multicast observable
src, connect := ToMulticast(Range(1,5))
ch1 := ToChannel(ctx, src)
ch2 := ToChannel(ctx, src)
connect(ctx) // start observing the parent observable
<-ch1 // 1
<-ch2 // 1

Sinks consume streams:

// First[T any](ctx context.Context, src Observable[T]) (item T, err error)
// Takes the first item from the observable and then cancels it.
item, err := First(ctx, src)

// ToSlice[T any](ctx context.Context, src Observable[T]) (items []T, err error)
// Converts the observable into a slice.
items, err := ToSlice(ctx, src)

// ToChannel[T any](ctx context.Context, src Observable[T], opts ...ToChannelOpt) <-chan T
// Converts the observable into a channel.
items := ToChannel(ctx, src)

// Discard[T any](ctx context.Context, src Observable[T]) error
// Consumes the observable by discarding the elements.
Discard(ctx, src)

Documentation

Overview

The stream package provides utilities for working with observable streams. Any type that implements the Observable interface can be transformed and consumed with these utilities.

Index

Constants

This section is empty.

Variables

View Source
var (
	// Emit the latest seen item when subscribing.
	EmitLatest = func(o *mcastOpts) { o.emitLatest = true }
)

Multicast options

Functions

func AlwaysRetry

func AlwaysRetry(err error) bool

AlwaysRetry always asks for a retry regardless of the error.

func Discard

func Discard[T any](ctx context.Context, src Observable[T])

Discard discards all items from 'src'.

func First

func First[T any](ctx context.Context, src Observable[T]) (item T, err error)

First returns the first item from 'src' observable and then cancels the subscription. Blocks until first item is observed or the stream is completed. If the observable completes without emitting items then io.EOF error is returned.

func Last

func Last[T any](ctx context.Context, src Observable[T]) (item T, err error)

Last returns the last item from 'src' observable. Blocks until the stream has been completed. If no items are observed then io.EOF error is returned.

func ObserveWithWaitGroup

func ObserveWithWaitGroup[T any](ctx context.Context, wg *sync.WaitGroup, src Observable[T], next func(T), complete func(error))

ObserveWithWaitGroup is like Observe(), but adds to a WaitGroup and calls Done() when complete.

func ToChannel

func ToChannel[T any](ctx context.Context, src Observable[T], opts ...ToChannelOpt) <-chan T

ToChannel converts an observable into a channel. When the provided context is cancelled the underlying subscription is cancelled and the channel is closed. To receive completion errors use WithErrorChan.

items <- ToChannel(ctx, Range(1,4))
a := <- items
b := <- items
c := <- items
_, ok := <- items
  => a=1, b=2, c=3, ok=false

func ToSlice

func ToSlice[T any](ctx context.Context, src Observable[T]) (items []T, err error)

ToSlice converts an Observable into a slice.

ToSlice(ctx, Range(1,4))
  => ([]int{1,2,3}, nil)

Types

type FuncObservable

type FuncObservable[T any] func(context.Context, func(T), func(error))

FuncObservable implements the Observable interface with a function.

This provides a convenient way of creating new observables without having to introduce a new type:

 var Ones Observable[int] =
 	FuncObservable[int](
		func(ctx context.Context, next func(int), complete func(error)) {
			go func() {
				defer complete(nil)
				for ctx.Err() == nil {
					next(1)
				}
			}()
		})

versus with a new type:

type onesObservable struct {}

func (o onesObservable) Observe(ctx context.Context, next func(int), complete func(error)) {
	go func() {
		defer complete(nil)
		for ctx.Err() == nil {
			next(1)
		}
	}()
}

func (FuncObservable[T]) Observe

func (f FuncObservable[T]) Observe(ctx context.Context, next func(T), complete func(error))

type MulticastOpt

type MulticastOpt func(o *mcastOpts)

type Observable

type Observable[T any] interface {
	// Observe a stream of values as long as the given context is valid.
	// 'next' is called for each item, and finally 'complete' is called
	// when the stream is complete, or an error has occurred.
	//
	// Observable implementations are allowed to call 'next' and 'complete'
	// from any goroutine, but never concurrently.
	Observe(ctx context.Context, next func(T), complete func(error))
}

Observable defines the Observe method for observing a stream of values.

Also see https://reactivex.io/documentation/observable.html for in-depth description of observables.

For interactive diagrams see https://rxmarbles.com/.

func Buffer

func Buffer[Buf any, T any](
	src Observable[T],
	bufferSize int,
	waitTime time.Duration,
	bufferItem func(Buf, T) Buf) Observable[Buf]

Buffer collects items into a buffer using the given buffering function and emits the buffer when 'waitTime' has elapsed. Buffer does not emit empty buffers.

In: a b c |-> Out: [a,b] [c] |->

func Concat

func Concat[T any](srcs ...Observable[T]) Observable[T]

Concat takes one or more observable of the same type and emits the items from each of them in order.

func Debounce

func Debounce[T any](src Observable[T], duration time.Duration) Observable[T]

Debounce emits an item only after the specified duration has lapsed since the previous item was emitted. Only the latest item is emitted.

In: a b c d e |-> Out: a d e |->

func Distinct

func Distinct[T comparable](src Observable[T]) Observable[T]

Distinct skips adjacent equal values.

Distinct(FromSlice([]int{1,1,2,2,3})
  => [1,2,3]

func Empty

func Empty[T any]() Observable[T]

Empty creates an "empty" observable that completes immediately.

xs, err := ToSlice(Empty[int]())
  => xs == []int{}, err == nil

func Error

func Error[T any](err error) Observable[T]

Error creates an observable that fails immediately with given error.

failErr = errors.New("fail")
xs, err := ToSlice(ctx, Error[int](failErr))
  => xs == []int{}, err == failErr

func Filter

func Filter[T any](src Observable[T], pred func(T) bool) Observable[T]

Filter only emits the values for which the provided predicate returns true.

Filter(Range(1,4), func(x int) int { return x%2 == 0 })
  => [2]

func FlatMap

func FlatMap[A, B any](src Observable[A], apply func(A) Observable[B]) Observable[B]

FlatMap applies a function that returns an observable of Bs to the source observable of As. The observable from the 'apply' function is flattened to produce a flat stream of Bs.

func FromChannel

func FromChannel[T any](in <-chan T) Observable[T]

FromChannel creates an observable from a channel. The channel is consumed by the first observer.

values := make(chan int)
go func() {
	values <- 1
	values <- 2
	values <- 3
	close(values)
}()
obs := FromChannel(values)
xs, err := ToSlice(ctx, obs)
  => xs == []int{1,2,3}, err == nil

xs, err = ToSlice(ctx, obs)
  => xs == []int{}, err == nil

func FromSlice

func FromSlice[T any](items []T) Observable[T]

FromSlice converts a slice into an Observable.

ToSlice(ctx, FromSlice([]int{1,2,3})
  => []int{1,2,3}

func Just

func Just[T any](item T) Observable[T]

Just creates an observable that emits a single item and completes.

xs, err := ToSlice(ctx, Just(1))
  => xs == []int{1}, err == nil

func Map

func Map[A, B any](src Observable[A], apply func(A) B) Observable[B]

Map applies a function onto values of an observable and emits the resulting values.

Map(Range(1,4), func(x int) int { return x * 2})
  => [2,4,6]

func Multicast

func Multicast[T any](opts ...MulticastOpt) (mcast Observable[T], next func(T), complete func(error))

Multicast creates an observable that "multicasts" the emitted items to all observers.

mcast, next, complete := Multicast[int]()
next(1) // no observers, none receives this
sub1 := ToChannel(ctx, mcast, WithBufferSize(10))
sub2 := ToChannel(ctx, mcast, WithBufferSize(10))
next(2)
next(3)
complete(nil)
  => sub1 == sub2 == [2,3]

mcast, next, complete = Multicast[int](EmitLatest)
next(1)
next(2) // "EmitLatest" tells Multicast to keep this
x, err := First(ctx, mcast)
  => x == 2, err == nil

func Range

func Range(from, to int) Observable[int]

Range creates an observable that emits integers in range from...to-1.

ToSlice(ctx, Range(1,2,3)) => []int{1,2,3}

func Reduce

func Reduce[Item, Result any](src Observable[Item], init Result, reduce func(Result, Item) Result) Observable[Result]

Reduce takes an initial state, and a function 'reduce' that is called on each element along with a state and returns an observable with a single item: the state produced by the last call to 'reduce'.

Reduce(Range(1,4), 0, func(sum, item int) int { return sum + item })
  => [(0+1+2+3)] => [6]

func Retry

func Retry[T any](src Observable[T], shouldRetry RetryFunc) Observable[T]

Retry resubscribes to the observable if it completes with an error.

func Stuck

func Stuck[T any]() Observable[T]

Stuck creates an observable that never emits anything and just waits for the context to be cancelled. Mainly meant for testing.

func Throttle

func Throttle[T any](src Observable[T], ratePerSecond float64, burst int) Observable[T]

Throttle limits the rate at which items are emitted.

func ToMulticast

func ToMulticast[T any](src Observable[T], opts ...MulticastOpt) (mcast Observable[T], connect func(context.Context))

ToMulticast makes 'src' a multicast observable, e.g. each observer will observe the same sequence. Useful for fanning out items to multiple observers from a source that is consumed by the act of observing.

mcast, connect := ToMulticast(FromChannel(values))
a := ToSlice(mcast)
b := ToSlice(mcast)
connect(ctx) // start!
  => a == b

type RetryFunc

type RetryFunc func(err error) bool

RetryFunc decides whether the processing should be retried given the error

func BackoffRetry

func BackoffRetry(shouldRetry RetryFunc, minBackoff, maxBackoff time.Duration) RetryFunc

BackoffRetry retries with an exponential backoff.

func LimitRetries

func LimitRetries(shouldRetry RetryFunc, numRetries int) RetryFunc

LimitRetries limits the number of retries with the given retry method. e.g. LimitRetries(BackoffRetry(time.Millisecond, time.Second), 5)

type ToChannelOpt

type ToChannelOpt func(*toChannelOpts)

func WithBufferSize

func WithBufferSize(n int) ToChannelOpt

WithBufferSize sets the buffer size of the channel returned by ToChannel.

func WithErrorChan

func WithErrorChan(errCh chan error) ToChannelOpt

WithErrorChan asks ToChannel to send completion error to the provided channel.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL