rxgo

package module
v2.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 6, 2021 License: MIT Imports: 15 Imported by: 212

README

RxGo

CI Go Report Card

Reactive Extensions for the Go Language

ReactiveX

ReactiveX, or Rx for short, is an API for programming with Observable streams. This is the official ReactiveX API for the Go language.

ReactiveX is a new, alternative way of asynchronous programming to callbacks, promises and deferred. It is about processing streams of events or items, with events being any occurrences or changes within the system. A stream of events is called an Observable.

An operator is a function that defines an Observable, how and when it should emit data. The list of operators covered is available here.

RxGo

The RxGo implementation is based on the concept of pipelines. A pipeline is a series of stages connected by channels, where each stage is a group of goroutines running the same function.

Let's see at a concrete example with each box being an operator:

  • We create a static Observable based on a fixed list of items using Just operator.
  • We define a transformation function (convert a circle into a square) using Map operator.
  • We filter each yellow square using Filter operator.

In this example, the final items are sent in a channel, available to a consumer. There are many ways to consume or to produce data using RxGo. Publishing the results in a channel is only one of them.

Each operator is a transformation stage. By default, everything is sequential. Yet, we can leverage modern CPU architectures by defining multiple instances of the same operator. Each operator instance being a goroutine connected to a common channel.

The philosophy of RxGo is to implement the ReactiveX concepts and leverage the main Go primitives (channels, goroutines, etc.) so that the integration between the two worlds is as smooth as possible.

Installation of RxGo v2

go get -u github.com/reactivex/rxgo/v2

Getting Started

Hello World

Let's create our first Observable and consume an item:

observable := rxgo.Just("Hello, World!")()
ch := observable.Observe()
item := <-ch
fmt.Println(item.V)

The Just operator creates an Observable from a static list of items. Of(value) creates an item from a given value. If we want to create an item from an error, we have to use Error(err). This is a difference with the v1 that was accepting directly a value or an error without having to wrap it. What's the rationale for this change? It is to prepare RxGo for the generics feature coming (hopefully) in Go 2.

By the way, the Just operator uses currying as a syntactic sugar. This way, it accepts multiple items in the first parameter list and multiple options in the second parameter list. We'll see below how to specify options.

Once the Observable is created, we can observe it using Observe(). By default, an Observable is lazy in the sense that it emits items only once a subscription is made. Observe() returns a <-chan rxgo.Item.

We consumed an item from this channel and printed its value of the item using item.V.

An item is a wrapper on top of a value or an error. We may want to check the type first like this:

item := <-ch
if item.Error() {
    return item.E
}
fmt.Println(item.V)

item.Error() returns a boolean indicating whether an item contains an error. Then, we use either item.E to get the error or item.V to get the value.

By default, an Observable is stopped once an error is produced. However, there are special operators to deal with errors (e.g. OnError, Retry, etc.)

It is also possible to consume items using callbacks:

observable.ForEach(func(v interface{}) {
    fmt.Printf("received: %v\n", v)
}, func(err error) {
    fmt.Printf("error: %e\n", err)
}, func() {
    fmt.Println("observable is closed")
})

In this example, we passed 3 functions:

  • A NextFunc triggered when a value item is emitted.
  • An ErrFunc triggered when an error item is emitted.
  • A CompletedFunc triggered once the Observable is completed.

ForEach is non-blocking. Yet, it returns a notification channel that will be closed once the Observable completes. Hence, to make the previous code blocking, we simply need to use <-:

<-observable.ForEach(...)
Real-World Example

Let's say we want to implement a stream that consumes the following Customer structure:

type Customer struct {
	ID             int
	Name, LastName string
	Age            int
	TaxNumber      string
}

We create an producer that will emit Customers to a given chan rxgo.Item and create an Observable from it:

// Create the input channel
ch := make(chan rxgo.Item)
// Data producer
go producer(ch)

// Create an Observable
observable := rxgo.FromChannel(ch)

Then, we need to perform the two following operations:

  • Filter the customers whose age is below 18.
  • Enrich each customer with a tax number. Retrieving a tax number is done for example by an IO-bound function doing an external REST call.

As the enriching step is IO-bound, it might be interesting to parallelize it within a given pool of goroutines. Yet, for some reason, all the Customer items need to be produced sequentially based on its ID.

observable.
	Filter(func(item interface{}) bool {
		// Filter operation
		customer := item.(Customer)
		return customer.Age > 18
	}).
	Map(func(_ context.Context, item interface{}) (interface{}, error) {
		// Enrich operation
		customer := item.(Customer)
		taxNumber, err := getTaxNumber(customer)
		if err != nil {
			return nil, err
		}
		customer.TaxNumber = taxNumber
		return customer, nil
	},
		// Create multiple instances of the map operator
		rxgo.WithPool(pool),
		// Serialize the items emitted by their Customer.ID
		rxgo.Serialize(func(item interface{}) int {
			customer := item.(Customer)
			return customer.ID
		}), rxgo.WithBufferedChannel(1))

In the end, we consume the items using ForEach() or Observe() for example. Observe() returns a <-chan Item:

for customer := range observable.Observe() {
	if customer.Error() {
		return err
	}
	fmt.Println(customer)
}

Observable Types

Hot vs Cold Observables

In the Rx world, there is a distinction between cold and hot Observables. When the data is produced by the Observable itself, it is a cold Observable. When the data is produced outside the Observable, it is a hot Observable. Usually, when we don't want to create a producer over and over again, we favour a hot Observable.

In RxGo, there is a similar concept.

First, let's create a hot Observable using FromChannel operator and see the implications:

ch := make(chan rxgo.Item)
go func() {
    for i := 0; i < 3; i++ {
        ch <- rxgo.Of(i)
    }
    close(ch)
}()
observable := rxgo.FromChannel(ch)

// First Observer
for item := range observable.Observe() {
    fmt.Println(item.V)
}

// Second Observer
for item := range observable.Observe() {
    fmt.Println(item.V)
}

The result of this execution is:

0
1
2

It means, the first Observer already consumed all items. And nothing left for others.
Though this behavior can be altered with Connectable Observables.
The main point here is the goroutine produced those items.

On the other hand, let's create a cold Observable using Defer operator:

observable := rxgo.Defer([]rxgo.Producer{func(_ context.Context, ch chan<- rxgo.Item) {
    for i := 0; i < 3; i++ {
        ch <- rxgo.Of(i)
    }
}})

// First Observer
for item := range observable.Observe() {
    fmt.Println(item.V)
}

// Second Observer
for item := range observable.Observe() {
    fmt.Println(item.V)
}

Now, the result is:

0
1
2
0
1
2

In the case of a cold observable, the stream was created independently for every observer.

Again, hot vs cold Observables are not about how you consume items, it's about where data is produced.
Good example for hot Observable are price ticks from a trading exchange.
And if you teach an Observable to fetch products from a database, then yield them one by one, you will create the cold Observable.

Backpressure

There is another operator called FromEventSource that creates an Observable from a channel. The difference between FromChannel operator is that as soon as the Observable is created, it starts to emit items regardless if there is an Observer or not. Hence, the items emitted by an Observable without Observer(s) are lost (whilst they are buffered with FromChannel operator).

A use case with FromEventSource operator is for example telemetry. We may not be interested in all the data produced from the very beginning of a stream. Only the data since we started to observe it.

Once we start observing an Observable created with FromEventSource, we can configure the backpressure strategy. By default, it is blocking (there is a guaranteed delivery for the items emitted after we observe it). We can override this strategy this way:

observable := rxgo.FromEventSource(input, rxgo.WithBackPressureStrategy(rxgo.Drop))

The Drop strategy means that if the pipeline after FromEventSource was not ready to consume an item, this item is dropped.

By default, a channel connecting operators is non-buffered. We can override this behaviour like this:

observable.Map(transform, rxgo.WithBufferedChannel(42))

Each operator has an opts ...Option parameter allowing to pass such options.

Lazy vs Eager Observation

The default observation strategy is lazy. It means the items emitted by an Observable are processed by an operator once we start observing it. We can change this behaviour this way:

observable := rxgo.FromChannel(ch).Map(transform, rxgo.WithObservationStrategy(rxgo.Eager))

In this case, the Map operator is triggered whenever an item is produced even without any Observer.

Sequential vs Parallel Operators

By default, each operator is sequential. One operator being one goroutine instance. We can override it using the following option:

observable.Map(transform, rxgo.WithPool(32))

In this example, we create a pool of 32 goroutines that consume items concurrently from the same channel. If the operation is CPU-bound, we can use the WithCPUPool() option that creates a pool based on the number of logical CPUs.

Connectable Observable

A Connectable Observable resembles an ordinary Observable, except that it does not begin emitting items when it is subscribed to, but only when its connect() method is called. In this way, you can wait for all intended Subscribers to subscribe to the Observable before the Observable begins emitting items.

Let's create a Connectable Observable using rxgo.WithPublishStrategy:

ch := make(chan rxgo.Item)
go func() {
	ch <- rxgo.Of(1)
	ch <- rxgo.Of(2)
	ch <- rxgo.Of(3)
	close(ch)
}()
observable := rxgo.FromChannel(ch, rxgo.WithPublishStrategy())

Then, we create two Observers:

observable.Map(func(_ context.Context, i interface{}) (interface{}, error) {
	return i.(int) + 1, nil
}).DoOnNext(func(i interface{}) {
	fmt.Printf("First observer: %d\n", i)
})

observable.Map(func(_ context.Context, i interface{}) (interface{}, error) {
	return i.(int) * 2, nil
}).DoOnNext(func(i interface{}) {
	fmt.Printf("Second observer: %d\n", i)
})

If observable was not a Connectable Observable, as DoOnNext creates an Observer, the source Observable would have begun emitting items. Yet, in the case of a Connectable Observable, we have to call Connect():

observable.Connect()

Once Connect() is called, the Connectable Observable begin to emit items.

There is another important change with a regular Observable. A Connectable Observable publishes its items. It means, all the Observers receive a copy of the items.

Here is an example with a regular Observable:

ch := make(chan rxgo.Item)
go func() {
	ch <- rxgo.Of(1)
	ch <- rxgo.Of(2)
	ch <- rxgo.Of(3)
	close(ch)
}()
// Create a regular Observable
observable := rxgo.FromChannel(ch)

// Create the first Observer
observable.DoOnNext(func(i interface{}) {
	fmt.Printf("First observer: %d\n", i)
})

// Create the second Observer
observable.DoOnNext(func(i interface{}) {
	fmt.Printf("Second observer: %d\n", i)
})
First observer: 1
First observer: 2
First observer: 3

Now, with a Connectable Observable:

ch := make(chan rxgo.Item)
go func() {
	ch <- rxgo.Of(1)
	ch <- rxgo.Of(2)
	ch <- rxgo.Of(3)
	close(ch)
}()
// Create a Connectable Observable
observable := rxgo.FromChannel(ch, rxgo.WithPublishStrategy())

// Create the first Observer
observable.DoOnNext(func(i interface{}) {
	fmt.Printf("First observer: %d\n", i)
})

// Create the second Observer
observable.DoOnNext(func(i interface{}) {
	fmt.Printf("Second observer: %d\n", i)
})

disposed, cancel := observable.Connect()
go func() {
	// Do something
	time.Sleep(time.Second)
	// Then cancel the subscription
	cancel()
}()
// Wait for the subscription to be disposed
<-disposed
Second observer: 1
First observer: 1
First observer: 2
First observer: 3
Second observer: 2
Second observer: 3
Observable, Single and Optional Single

An Iterable is an object that can be observed using Observe(opts ...Option) <-chan Item.

An Iterable can be either:

  • An Observable: emit 0 or multiple items
  • A Single: emit 1 item
  • An Optional Single: emit 0 or 1 item

Documentation

Package documentation: https://pkg.go.dev/github.com/reactivex/rxgo/v2

Assert API

How to use the assert API to write unit tests while using RxGo.

Operator Options

Operator options

Creating Observables
  • Create — create an Observable from scratch by calling observer methods programmatically
  • Defer — do not create the Observable until the observer subscribes, and create a fresh Observable for each observer
  • Empty/Never/Thrown — create Observables that have very precise and limited behaviour
  • FromChannel — create an Observable based on a lazy channel
  • FromEventSource — create an Observable based on an eager channel
  • Interval — create an Observable that emits a sequence of integers spaced by a particular time interval
  • Just — convert a set of objects into an Observable that emits that or those objects
  • JustItem — convert one object into a Single that emits this object
  • Range — create an Observable that emits a range of sequential integers
  • Repeat — create an Observable that emits a particular item or sequence of items repeatedly
  • Start — create an Observable that emits the return value of a function
  • Timer — create an Observable that completes after a specified delay
Transforming Observables
  • Buffer — periodically gather items from an Observable into bundles and emit these bundles rather than emitting the items one at a time
  • FlatMap — transform the items emitted by an Observable into Observables, then flatten the emissions from those into a single Observable
  • GroupBy — divide an Observable into a set of Observables that each emit a different group of items from the original Observable, organized by key
  • GroupByDynamic — divide an Observable into a dynamic set of Observables that each emit GroupedObservables from the original Observable, organized by key
  • Map — transform the items emitted by an Observable by applying a function to each item
  • Marshal — transform the items emitted by an Observable by applying a marshalling function to each item
  • Scan — apply a function to each item emitted by an Observable, sequentially, and emit each successive value
  • Unmarshal — transform the items emitted by an Observable by applying an unmarshalling function to each item
  • Window — apply a function to each item emitted by an Observable, sequentially, and emit each successive value
Filtering Observables
  • Debounce — only emit an item from an Observable if a particular timespan has passed without it emitting another item
  • Distinct/DistinctUntilChanged — suppress duplicate items emitted by an Observable
  • ElementAt — emit only item n emitted by an Observable
  • Filter — emit only those items from an Observable that pass a predicate test
  • Find — emit the first item passing a predicate then complete
  • First/FirstOrDefault — emit only the first item or the first item that meets a condition, from an Observable
  • IgnoreElements — do not emit any items from an Observable but mirror its termination notification
  • Last/LastOrDefault — emit only the last item emitted by an Observable
  • Sample — emit the most recent item emitted by an Observable within periodic time intervals
  • Skip — suppress the first n items emitted by an Observable
  • SkipLast — suppress the last n items emitted by an Observable
  • Take — emit only the first n items emitted by an Observable
  • TakeLast — emit only the last n items emitted by an Observable
Combining Observables
  • CombineLatest — when an item is emitted by either of two Observables, combine the latest item emitted by each Observable via a specified function and emit items based on the results of this function
  • Join — combine items emitted by two Observables whenever an item from one Observable is emitted during a time window defined according to an item emitted by the other Observable
  • Merge — combine multiple Observables into one by merging their emissions
  • StartWithIterable — emit a specified sequence of items before beginning to emit the items from the source Iterable
  • ZipFromIterable — combine the emissions of multiple Observables together via a specified function and emit single items for each combination based on the results of this function
Error Handling Operators
  • Catch — recover from an onError notification by continuing the sequence without error
  • Retry/BackOffRetry — if a source Observable sends an onError notification, resubscribe to it in the hopes that it will complete without error
Observable Utility Operators
  • Do - register an action to take upon a variety of Observable lifecycle events
  • Run — create an Observer without consuming the emitted items
  • Send — send the Observable items in a specific channel
  • Serialize — force an Observable to make serialized calls and to be well-behaved
  • TimeInterval — convert an Observable that emits items into one that emits indications of the amount of time elapsed between those emissions
  • Timestamp — attach a timestamp to each item emitted by an Observable
Conditional and Boolean Operators
  • All — determine whether all items emitted by an Observable meet some criteria
  • Amb — given two or more source Observables, emit all of the items from only the first of these Observables to emit an item
  • Contains — determine whether an Observable emits a particular item or not
  • DefaultIfEmpty — emit items from the source Observable, or a default item if the source Observable emits nothing
  • SequenceEqual — determine whether two Observables emit the same sequence of items
  • SkipWhile — discard items emitted by an Observable until a specified condition becomes false
  • TakeUntil — discard items emitted by an Observable after a second Observable emits an item or terminates
  • TakeWhile — discard items emitted by an Observable after a specified condition becomes false
Mathematical and Aggregate Operators
  • Average — calculates the average of numbers emitted by an Observable and emits this average
  • Concat — emit the emissions from two or more Observables without interleaving them
  • Count — count the number of items emitted by the source Observable and emit only this value
  • Max — determine, and emit, the maximum-valued item emitted by an Observable
  • Min — determine, and emit, the minimum-valued item emitted by an Observable
  • Reduce — apply a function to each item emitted by an Observable, sequentially, and emit the final value
  • Sum — calculate the sum of numbers emitted by an Observable and emit this sum
Operators to Convert Observables

Contributions

All contributions are very welcome! Be sure you check out the contributing guidelines first. Also, if you publish a post about RxGo, please let us know. We would be glad to include it in the External Resources section.

External Resources

Documentation

Overview

Package rxgo is the main RxGo package.

Index

Constants

This section is empty.

Variables

View Source
var Infinite int64 = -1

Infinite represents an infinite wait time

View Source
var OptionalSingleEmpty = Item{}

OptionalSingleEmpty is the constant returned when an OptionalSingle is empty.

Functions

func Assert

func Assert(ctx context.Context, t *testing.T, iterable Iterable, assertions ...RxAssert)

Assert asserts the result of an iterable against a list of assertions.

func Just

func Just(items ...interface{}) func(opts ...Option) Observable

Just creates an Observable with the provided items.

func SendItems

func SendItems(ctx context.Context, ch chan<- Item, strategy CloseChannelStrategy, items ...interface{})

SendItems is an utility function that send a list of interface{} and indicate a strategy on whether to close the channel once the function completes.

Types

type AssertPredicate

type AssertPredicate func(items []interface{}) error

AssertPredicate is a custom predicate based on the items.

type BackpressureStrategy

type BackpressureStrategy uint32

BackpressureStrategy is the backpressure strategy type.

const (
	// Block blocks until the channel is available.
	Block BackpressureStrategy = iota
	// Drop drops the message.
	Drop
)

type CloseChannelStrategy

type CloseChannelStrategy uint32

CloseChannelStrategy indicates a strategy on whether to close a channel.

const (
	// LeaveChannelOpen indicates to leave the channel open after completion.
	LeaveChannelOpen CloseChannelStrategy = iota
	// CloseChannel indicates to close the channel open after completion.
	CloseChannel
)

type Comparator

type Comparator func(interface{}, interface{}) int

Comparator defines a func that returns an int: - 0 if two elements are equals - A negative value if the first argument is less than the second - A positive value if the first argument is greater than the second

type CompletedFunc

type CompletedFunc func()

CompletedFunc handles the end of a stream.

type Disposable

type Disposable context.CancelFunc

Disposable is a function to be called in order to dispose a subscription.

type Disposed

type Disposed <-chan struct{}

Disposed is a notification channel indicating when an Observable is closed.

type Duration

type Duration interface {
	// contains filtered or unexported methods
}

Duration represents a duration

func WithDuration

func WithDuration(d time.Duration) Duration

WithDuration is a duration option

type ErrFunc

type ErrFunc func(error)

ErrFunc handles an error in a stream.

type ErrorFunc

type ErrorFunc func(error) interface{}

ErrorFunc defines a function that computes a value from an error.

type ErrorToObservable

type ErrorToObservable func(error) Observable

ErrorToObservable defines a function that transforms an observable from an error.

type Func

type Func func(context.Context, interface{}) (interface{}, error)

Func defines a function that computes a value from an input value.

type Func2

type Func2 func(context.Context, interface{}, interface{}) (interface{}, error)

Func2 defines a function that computes a value from two input values.

type FuncN

type FuncN func(...interface{}) interface{}

FuncN defines a function that computes a value from N input values.

type GroupedObservable added in v2.2.0

type GroupedObservable struct {
	Observable
	// Key is the distribution key
	Key string
}

GroupedObservable is the observable type emitted by the GroupByDynamic operator.

type IllegalInputError

type IllegalInputError struct {
	// contains filtered or unexported fields
}

IllegalInputError is triggered when the observable receives an illegal input.

func (IllegalInputError) Error

func (e IllegalInputError) Error() string

type IndexOutOfBoundError

type IndexOutOfBoundError struct {
	// contains filtered or unexported fields
}

IndexOutOfBoundError is triggered when the observable cannot access to the specified index.

func (IndexOutOfBoundError) Error

func (e IndexOutOfBoundError) Error() string

type Item

type Item struct {
	V interface{}
	E error
}

Item is a wrapper having either a value or an error.

func Error

func Error(err error) Item

Error creates an item from an error.

func Of

func Of(i interface{}) Item

Of creates an item from a value.

func (Item) Error

func (i Item) Error() bool

Error checks if an item is an error.

func (Item) SendBlocking

func (i Item) SendBlocking(ch chan<- Item)

SendBlocking sends an item and blocks until it is sent.

func (Item) SendContext

func (i Item) SendContext(ctx context.Context, ch chan<- Item) bool

SendContext sends an item and blocks until it is sent or a context canceled. It returns a boolean to indicate whether the item was sent.

func (Item) SendNonBlocking

func (i Item) SendNonBlocking(ch chan<- Item) bool

SendNonBlocking sends an item without blocking. It returns a boolean to indicate whether the item was sent.

type ItemToObservable

type ItemToObservable func(Item) Observable

ItemToObservable defines a function that computes an observable from an item.

type Iterable

type Iterable interface {
	Observe(opts ...Option) <-chan Item
}

Iterable is the basic type that can be observed.

type Marshaller

type Marshaller func(interface{}) ([]byte, error)

Marshaller defines a marshaller type (interface{} to []byte).

type NextFunc

type NextFunc func(interface{})

NextFunc handles a next item in a stream.

type Observable

type Observable interface {
	Iterable
	All(predicate Predicate, opts ...Option) Single
	AverageFloat32(opts ...Option) Single
	AverageFloat64(opts ...Option) Single
	AverageInt(opts ...Option) Single
	AverageInt8(opts ...Option) Single
	AverageInt16(opts ...Option) Single
	AverageInt32(opts ...Option) Single
	AverageInt64(opts ...Option) Single
	BackOffRetry(backOffCfg backoff.BackOff, opts ...Option) Observable
	BufferWithCount(count int, opts ...Option) Observable
	BufferWithTime(timespan Duration, opts ...Option) Observable
	BufferWithTimeOrCount(timespan Duration, count int, opts ...Option) Observable
	Connect(ctx context.Context) (context.Context, Disposable)
	Contains(equal Predicate, opts ...Option) Single
	Count(opts ...Option) Single
	Debounce(timespan Duration, opts ...Option) Observable
	DefaultIfEmpty(defaultValue interface{}, opts ...Option) Observable
	Distinct(apply Func, opts ...Option) Observable
	DistinctUntilChanged(apply Func, opts ...Option) Observable
	DoOnCompleted(completedFunc CompletedFunc, opts ...Option) Disposed
	DoOnError(errFunc ErrFunc, opts ...Option) Disposed
	DoOnNext(nextFunc NextFunc, opts ...Option) Disposed
	ElementAt(index uint, opts ...Option) Single
	Error(opts ...Option) error
	Errors(opts ...Option) []error
	Filter(apply Predicate, opts ...Option) Observable
	Find(find Predicate, opts ...Option) OptionalSingle
	First(opts ...Option) OptionalSingle
	FirstOrDefault(defaultValue interface{}, opts ...Option) Single
	FlatMap(apply ItemToObservable, opts ...Option) Observable
	ForEach(nextFunc NextFunc, errFunc ErrFunc, completedFunc CompletedFunc, opts ...Option) Disposed
	GroupBy(length int, distribution func(Item) int, opts ...Option) Observable
	GroupByDynamic(distribution func(Item) string, opts ...Option) Observable
	IgnoreElements(opts ...Option) Observable
	Join(joiner Func2, right Observable, timeExtractor func(interface{}) time.Time, window Duration, opts ...Option) Observable
	Last(opts ...Option) OptionalSingle
	LastOrDefault(defaultValue interface{}, opts ...Option) Single
	Map(apply Func, opts ...Option) Observable
	Marshal(marshaller Marshaller, opts ...Option) Observable
	Max(comparator Comparator, opts ...Option) OptionalSingle
	Min(comparator Comparator, opts ...Option) OptionalSingle
	OnErrorResumeNext(resumeSequence ErrorToObservable, opts ...Option) Observable
	OnErrorReturn(resumeFunc ErrorFunc, opts ...Option) Observable
	OnErrorReturnItem(resume interface{}, opts ...Option) Observable
	Reduce(apply Func2, opts ...Option) OptionalSingle
	Repeat(count int64, frequency Duration, opts ...Option) Observable
	Retry(count int, shouldRetry func(error) bool, opts ...Option) Observable
	Run(opts ...Option) Disposed
	Sample(iterable Iterable, opts ...Option) Observable
	Scan(apply Func2, opts ...Option) Observable
	SequenceEqual(iterable Iterable, opts ...Option) Single
	Send(output chan<- Item, opts ...Option)
	Serialize(from int, identifier func(interface{}) int, opts ...Option) Observable
	Skip(nth uint, opts ...Option) Observable
	SkipLast(nth uint, opts ...Option) Observable
	SkipWhile(apply Predicate, opts ...Option) Observable
	StartWith(iterable Iterable, opts ...Option) Observable
	SumFloat32(opts ...Option) OptionalSingle
	SumFloat64(opts ...Option) OptionalSingle
	SumInt64(opts ...Option) OptionalSingle
	Take(nth uint, opts ...Option) Observable
	TakeLast(nth uint, opts ...Option) Observable
	TakeUntil(apply Predicate, opts ...Option) Observable
	TakeWhile(apply Predicate, opts ...Option) Observable
	TimeInterval(opts ...Option) Observable
	Timestamp(opts ...Option) Observable
	ToMap(keySelector Func, opts ...Option) Single
	ToMapWithValueSelector(keySelector, valueSelector Func, opts ...Option) Single
	ToSlice(initialCapacity int, opts ...Option) ([]interface{}, error)
	Unmarshal(unmarshaller Unmarshaller, factory func() interface{}, opts ...Option) Observable
	WindowWithCount(count int, opts ...Option) Observable
	WindowWithTime(timespan Duration, opts ...Option) Observable
	WindowWithTimeOrCount(timespan Duration, count int, opts ...Option) Observable
	ZipFromIterable(iterable Iterable, zipper Func2, opts ...Option) Observable
}

Observable is the standard interface for Observables.

func Amb

func Amb(observables []Observable, opts ...Option) Observable

Amb takes several Observables, emit all of the items from only the first of these Observables to emit an item or notification.

func CombineLatest

func CombineLatest(f FuncN, observables []Observable, opts ...Option) Observable

CombineLatest combines the latest item emitted by each Observable via a specified function and emit items based on the results of this function.

func Concat

func Concat(observables []Observable, opts ...Option) Observable

Concat emits the emissions from two or more Observables without interleaving them.

func Create

func Create(f []Producer, opts ...Option) Observable

Create creates an Observable from scratch by calling observer methods programmatically.

func Defer

func Defer(f []Producer, opts ...Option) Observable

Defer does not create the Observable until the observer subscribes, and creates a fresh Observable for each observer.

func Empty

func Empty() Observable

Empty creates an Observable with no item and terminate immediately.

func FromChannel

func FromChannel(next <-chan Item, opts ...Option) Observable

FromChannel creates a cold observable from a channel.

func FromEventSource

func FromEventSource(next <-chan Item, opts ...Option) Observable

FromEventSource creates a hot observable from a channel.

func Interval

func Interval(interval Duration, opts ...Option) Observable

Interval creates an Observable emitting incremental integers infinitely between each given time interval.

func Merge

func Merge(observables []Observable, opts ...Option) Observable

Merge combines multiple Observables into one by merging their emissions

func Never

func Never() Observable

Never creates an Observable that emits no items and does not terminate.

func Range

func Range(start, count int, opts ...Option) Observable

Range creates an Observable that emits count sequential integers beginning at start.

func Start

func Start(fs []Supplier, opts ...Option) Observable

Start creates an Observable from one or more directive-like Supplier and emits the result of each operation asynchronously on a new Observable.

func Thrown

func Thrown(err error) Observable

Thrown creates an Observable that emits no items and terminates with an error.

func Timer

func Timer(d Duration, opts ...Option) Observable

Timer returns an Observable that completes after a specified delay.

type ObservableImpl

type ObservableImpl struct {
	// contains filtered or unexported fields
}

ObservableImpl implements Observable.

func (*ObservableImpl) All

func (o *ObservableImpl) All(predicate Predicate, opts ...Option) Single

All determines whether all items emitted by an Observable meet some criteria.

func (*ObservableImpl) AverageFloat32

func (o *ObservableImpl) AverageFloat32(opts ...Option) Single

AverageFloat32 calculates the average of numbers emitted by an Observable and emits the average float32.

func (*ObservableImpl) AverageFloat64

func (o *ObservableImpl) AverageFloat64(opts ...Option) Single

AverageFloat64 calculates the average of numbers emitted by an Observable and emits the average float64.

func (*ObservableImpl) AverageInt

func (o *ObservableImpl) AverageInt(opts ...Option) Single

AverageInt calculates the average of numbers emitted by an Observable and emits the average int.

func (*ObservableImpl) AverageInt16

func (o *ObservableImpl) AverageInt16(opts ...Option) Single

AverageInt16 calculates the average of numbers emitted by an Observable and emits the average int16.

func (*ObservableImpl) AverageInt32

func (o *ObservableImpl) AverageInt32(opts ...Option) Single

AverageInt32 calculates the average of numbers emitted by an Observable and emits the average int32.

func (*ObservableImpl) AverageInt64

func (o *ObservableImpl) AverageInt64(opts ...Option) Single

AverageInt64 calculates the average of numbers emitted by an Observable and emits this average int64.

func (*ObservableImpl) AverageInt8

func (o *ObservableImpl) AverageInt8(opts ...Option) Single

AverageInt8 calculates the average of numbers emitted by an Observable and emits the≤ average int8.

func (*ObservableImpl) BackOffRetry

func (o *ObservableImpl) BackOffRetry(backOffCfg backoff.BackOff, opts ...Option) Observable

BackOffRetry implements a backoff retry if a source Observable sends an error, resubscribe to it in the hopes that it will complete without error. Cannot be run in parallel.

func (*ObservableImpl) BufferWithCount

func (o *ObservableImpl) BufferWithCount(count int, opts ...Option) Observable

BufferWithCount returns an Observable that emits buffers of items it collects from the source Observable. The resulting Observable emits buffers every skip items, each containing a slice of count items. When the source Observable completes or encounters an error, the resulting Observable emits the current buffer and propagates the notification from the source Observable.

func (*ObservableImpl) BufferWithTime

func (o *ObservableImpl) BufferWithTime(timespan Duration, opts ...Option) Observable

BufferWithTime returns an Observable that emits buffers of items it collects from the source Observable. The resulting Observable starts a new buffer periodically, as determined by the timeshift argument. It emits each buffer after a fixed timespan, specified by the timespan argument. When the source Observable completes or encounters an error, the resulting Observable emits the current buffer and propagates the notification from the source Observable.

func (*ObservableImpl) BufferWithTimeOrCount

func (o *ObservableImpl) BufferWithTimeOrCount(timespan Duration, count int, opts ...Option) Observable

BufferWithTimeOrCount returns an Observable that emits buffers of items it collects from the source Observable either from a given count or at a given time interval.

func (*ObservableImpl) Connect

Connect instructs a connectable Observable to begin emitting items to its subscribers.

func (*ObservableImpl) Contains

func (o *ObservableImpl) Contains(equal Predicate, opts ...Option) Single

Contains determines whether an Observable emits a particular item or not.

func (*ObservableImpl) Count

func (o *ObservableImpl) Count(opts ...Option) Single

Count counts the number of items emitted by the source Observable and emit only this value.

func (*ObservableImpl) Debounce

func (o *ObservableImpl) Debounce(timespan Duration, opts ...Option) Observable

Debounce only emits an item from an Observable if a particular timespan has passed without it emitting another item.

func (*ObservableImpl) DefaultIfEmpty

func (o *ObservableImpl) DefaultIfEmpty(defaultValue interface{}, opts ...Option) Observable

DefaultIfEmpty returns an Observable that emits the items emitted by the source Observable or a specified default item if the source Observable is empty.

func (*ObservableImpl) Distinct

func (o *ObservableImpl) Distinct(apply Func, opts ...Option) Observable

Distinct suppresses duplicate items in the original Observable and returns a new Observable.

func (*ObservableImpl) DistinctUntilChanged

func (o *ObservableImpl) DistinctUntilChanged(apply Func, opts ...Option) Observable

DistinctUntilChanged suppresses consecutive duplicate items in the original Observable. Cannot be run in parallel.

func (*ObservableImpl) DoOnCompleted

func (o *ObservableImpl) DoOnCompleted(completedFunc CompletedFunc, opts ...Option) Disposed

DoOnCompleted registers a callback action that will be called once the Observable terminates.

func (*ObservableImpl) DoOnError

func (o *ObservableImpl) DoOnError(errFunc ErrFunc, opts ...Option) Disposed

DoOnError registers a callback action that will be called if the Observable terminates abnormally.

func (*ObservableImpl) DoOnNext

func (o *ObservableImpl) DoOnNext(nextFunc NextFunc, opts ...Option) Disposed

DoOnNext registers a callback action that will be called on each item emitted by the Observable.

func (*ObservableImpl) ElementAt

func (o *ObservableImpl) ElementAt(index uint, opts ...Option) Single

ElementAt emits only item n emitted by an Observable. Cannot be run in parallel.

func (*ObservableImpl) Error

func (o *ObservableImpl) Error(opts ...Option) error

Error returns the eventual Observable error. This method is blocking.

func (*ObservableImpl) Errors

func (o *ObservableImpl) Errors(opts ...Option) []error

Errors returns an eventual list of Observable errors. This method is blocking

func (*ObservableImpl) Filter

func (o *ObservableImpl) Filter(apply Predicate, opts ...Option) Observable

Filter emits only those items from an Observable that pass a predicate test.

func (*ObservableImpl) Find added in v2.3.0

func (o *ObservableImpl) Find(find Predicate, opts ...Option) OptionalSingle

Find emits the first item passing a predicate then complete.

func (*ObservableImpl) First

func (o *ObservableImpl) First(opts ...Option) OptionalSingle

First returns new Observable which emit only first item. Cannot be run in parallel.

func (*ObservableImpl) FirstOrDefault

func (o *ObservableImpl) FirstOrDefault(defaultValue interface{}, opts ...Option) Single

FirstOrDefault returns new Observable which emit only first item. If the observable fails to emit any items, it emits a default value. Cannot be run in parallel.

func (*ObservableImpl) FlatMap

func (o *ObservableImpl) FlatMap(apply ItemToObservable, opts ...Option) Observable

FlatMap transforms the items emitted by an Observable into Observables, then flatten the emissions from those into a single Observable.

func (*ObservableImpl) ForEach

func (o *ObservableImpl) ForEach(nextFunc NextFunc, errFunc ErrFunc, completedFunc CompletedFunc, opts ...Option) Disposed

ForEach subscribes to the Observable and receives notifications for each element.

func (*ObservableImpl) GroupBy

func (o *ObservableImpl) GroupBy(length int, distribution func(Item) int, opts ...Option) Observable

GroupBy divides an Observable into a set of Observables that each emit a different group of items from the original Observable, organized by key.

func (*ObservableImpl) GroupByDynamic added in v2.2.0

func (o *ObservableImpl) GroupByDynamic(distribution func(Item) string, opts ...Option) Observable

GroupByDynamic divides an Observable into a dynamic set of Observables that each emit GroupedObservable from the original Observable, organized by key.

func (*ObservableImpl) IgnoreElements

func (o *ObservableImpl) IgnoreElements(opts ...Option) Observable

IgnoreElements ignores all items emitted by the source ObservableSource except for the errors. Cannot be run in parallel.

func (*ObservableImpl) Join

func (o *ObservableImpl) Join(joiner Func2, right Observable, timeExtractor func(interface{}) time.Time, window Duration, opts ...Option) Observable

Join combines items emitted by two Observables whenever an item from one Observable is emitted during a time window defined according to an item emitted by the other Observable. The time is extracted using a timeExtractor function.

func (*ObservableImpl) Last

func (o *ObservableImpl) Last(opts ...Option) OptionalSingle

Last returns a new Observable which emit only last item. Cannot be run in parallel.

func (*ObservableImpl) LastOrDefault

func (o *ObservableImpl) LastOrDefault(defaultValue interface{}, opts ...Option) Single

LastOrDefault returns a new Observable which emit only last item. If the observable fails to emit any items, it emits a default value. Cannot be run in parallel.

func (*ObservableImpl) Map

func (o *ObservableImpl) Map(apply Func, opts ...Option) Observable

Map transforms the items emitted by an Observable by applying a function to each item.

func (*ObservableImpl) Marshal

func (o *ObservableImpl) Marshal(marshaller Marshaller, opts ...Option) Observable

Marshal transforms the items emitted by an Observable by applying a marshalling to each item.

func (*ObservableImpl) Max

func (o *ObservableImpl) Max(comparator Comparator, opts ...Option) OptionalSingle

Max determines and emits the maximum-valued item emitted by an Observable according to a comparator.

func (*ObservableImpl) Min

func (o *ObservableImpl) Min(comparator Comparator, opts ...Option) OptionalSingle

Min determines and emits the minimum-valued item emitted by an Observable according to a comparator.

func (*ObservableImpl) Observe

func (o *ObservableImpl) Observe(opts ...Option) <-chan Item

Observe observes an Observable by returning its channel.

func (*ObservableImpl) OnErrorResumeNext

func (o *ObservableImpl) OnErrorResumeNext(resumeSequence ErrorToObservable, opts ...Option) Observable

OnErrorResumeNext instructs an Observable to pass control to another Observable rather than invoking onError if it encounters an error.

func (*ObservableImpl) OnErrorReturn

func (o *ObservableImpl) OnErrorReturn(resumeFunc ErrorFunc, opts ...Option) Observable

OnErrorReturn instructs an Observable to emit an item (returned by a specified function) rather than invoking onError if it encounters an error.

func (*ObservableImpl) OnErrorReturnItem

func (o *ObservableImpl) OnErrorReturnItem(resume interface{}, opts ...Option) Observable

OnErrorReturnItem instructs on Observable to emit an item if it encounters an error.

func (*ObservableImpl) Reduce

func (o *ObservableImpl) Reduce(apply Func2, opts ...Option) OptionalSingle

Reduce applies a function to each item emitted by an Observable, sequentially, and emit the final value.

func (*ObservableImpl) Repeat

func (o *ObservableImpl) Repeat(count int64, frequency Duration, opts ...Option) Observable

Repeat returns an Observable that repeats the sequence of items emitted by the source Observable at most count times, at a particular frequency. Cannot run in parallel.

func (*ObservableImpl) Retry

func (o *ObservableImpl) Retry(count int, shouldRetry func(error) bool, opts ...Option) Observable

Retry retries if a source Observable sends an error, resubscribe to it in the hopes that it will complete without error. Cannot be run in parallel.

func (*ObservableImpl) Run

func (o *ObservableImpl) Run(opts ...Option) Disposed

Run creates an Observer without consuming the emitted items.

func (*ObservableImpl) Sample

func (o *ObservableImpl) Sample(iterable Iterable, opts ...Option) Observable

Sample returns an Observable that emits the most recent items emitted by the source Iterable whenever the input Iterable emits an item.

func (*ObservableImpl) Scan

func (o *ObservableImpl) Scan(apply Func2, opts ...Option) Observable

Scan apply a Func2 to each item emitted by an Observable, sequentially, and emit each successive value. Cannot be run in parallel.

func (*ObservableImpl) Send

func (o *ObservableImpl) Send(output chan<- Item, opts ...Option)

Send sends the items to a given channel.

func (*ObservableImpl) SequenceEqual

func (o *ObservableImpl) SequenceEqual(iterable Iterable, opts ...Option) Single

SequenceEqual emits true if an Observable and the input Observable emit the same items, in the same order, with the same termination state. Otherwise, it emits false.

func (*ObservableImpl) Serialize

func (o *ObservableImpl) Serialize(from int, identifier func(interface{}) int, opts ...Option) Observable

Serialize forces an Observable to make serialized calls and to be well-behaved.

func (*ObservableImpl) Skip

func (o *ObservableImpl) Skip(nth uint, opts ...Option) Observable

Skip suppresses the first n items in the original Observable and returns a new Observable with the rest items. Cannot be run in parallel.

func (*ObservableImpl) SkipLast

func (o *ObservableImpl) SkipLast(nth uint, opts ...Option) Observable

SkipLast suppresses the last n items in the original Observable and returns a new Observable with the rest items. Cannot be run in parallel.

func (*ObservableImpl) SkipWhile

func (o *ObservableImpl) SkipWhile(apply Predicate, opts ...Option) Observable

SkipWhile discard items emitted by an Observable until a specified condition becomes false. Cannot be run in parallel.

func (*ObservableImpl) StartWith

func (o *ObservableImpl) StartWith(iterable Iterable, opts ...Option) Observable

StartWith emits a specified Iterable before beginning to emit the items from the source Observable.

func (*ObservableImpl) SumFloat32

func (o *ObservableImpl) SumFloat32(opts ...Option) OptionalSingle

SumFloat32 calculates the average of float32 emitted by an Observable and emits a float32.

func (*ObservableImpl) SumFloat64

func (o *ObservableImpl) SumFloat64(opts ...Option) OptionalSingle

SumFloat64 calculates the average of float64 emitted by an Observable and emits a float64.

func (*ObservableImpl) SumInt64

func (o *ObservableImpl) SumInt64(opts ...Option) OptionalSingle

SumInt64 calculates the average of integers emitted by an Observable and emits an int64.

func (*ObservableImpl) Take

func (o *ObservableImpl) Take(nth uint, opts ...Option) Observable

Take emits only the first n items emitted by an Observable. Cannot be run in parallel.

func (*ObservableImpl) TakeLast

func (o *ObservableImpl) TakeLast(nth uint, opts ...Option) Observable

TakeLast emits only the last n items emitted by an Observable. Cannot be run in parallel.

func (*ObservableImpl) TakeUntil

func (o *ObservableImpl) TakeUntil(apply Predicate, opts ...Option) Observable

TakeUntil returns an Observable that emits items emitted by the source Observable, checks the specified predicate for each item, and then completes when the condition is satisfied. Cannot be run in parallel.

func (*ObservableImpl) TakeWhile

func (o *ObservableImpl) TakeWhile(apply Predicate, opts ...Option) Observable

TakeWhile returns an Observable that emits items emitted by the source ObservableSource so long as each item satisfied a specified condition, and then completes as soon as this condition is not satisfied. Cannot be run in parallel.

func (*ObservableImpl) TimeInterval

func (o *ObservableImpl) TimeInterval(opts ...Option) Observable

TimeInterval converts an Observable that emits items into one that emits indications of the amount of time elapsed between those emissions.

func (*ObservableImpl) Timestamp

func (o *ObservableImpl) Timestamp(opts ...Option) Observable

Timestamp attaches a timestamp to each item emitted by an Observable indicating when it was emitted.

func (*ObservableImpl) ToMap

func (o *ObservableImpl) ToMap(keySelector Func, opts ...Option) Single

ToMap convert the sequence of items emitted by an Observable into a map keyed by a specified key function. Cannot be run in parallel.

func (*ObservableImpl) ToMapWithValueSelector

func (o *ObservableImpl) ToMapWithValueSelector(keySelector, valueSelector Func, opts ...Option) Single

ToMapWithValueSelector convert the sequence of items emitted by an Observable into a map keyed by a specified key function and valued by another value function. Cannot be run in parallel.

func (*ObservableImpl) ToSlice

func (o *ObservableImpl) ToSlice(initialCapacity int, opts ...Option) ([]interface{}, error)

ToSlice collects all items from an Observable and emit them in a slice and an optional error. Cannot be run in parallel.

func (*ObservableImpl) Unmarshal

func (o *ObservableImpl) Unmarshal(unmarshaller Unmarshaller, factory func() interface{}, opts ...Option) Observable

Unmarshal transforms the items emitted by an Observable by applying an unmarshalling to each item.

func (*ObservableImpl) WindowWithCount

func (o *ObservableImpl) WindowWithCount(count int, opts ...Option) Observable

WindowWithCount periodically subdivides items from an Observable into Observable windows of a given size and emit these windows rather than emitting the items one at a time.

func (*ObservableImpl) WindowWithTime

func (o *ObservableImpl) WindowWithTime(timespan Duration, opts ...Option) Observable

WindowWithTime periodically subdivides items from an Observable into Observables based on timed windows and emit them rather than emitting the items one at a time.

func (*ObservableImpl) WindowWithTimeOrCount

func (o *ObservableImpl) WindowWithTimeOrCount(timespan Duration, count int, opts ...Option) Observable

WindowWithTimeOrCount periodically subdivides items from an Observable into Observables based on timed windows or a specific size and emit them rather than emitting the items one at a time.

func (*ObservableImpl) ZipFromIterable

func (o *ObservableImpl) ZipFromIterable(iterable Iterable, zipper Func2, opts ...Option) Observable

ZipFromIterable merges the emissions of an Iterable via a specified function and emit single items for each combination based on the results of this function.

type ObservationStrategy

type ObservationStrategy uint32

ObservationStrategy defines the strategy to consume from an Observable.

const (
	// Lazy is the default observation strategy, when an Observer subscribes.
	Lazy ObservationStrategy = iota
	// Eager means consuming as soon as the Observable is created.
	Eager
)

type OnErrorStrategy

type OnErrorStrategy uint32

OnErrorStrategy is the Observable error strategy.

const (
	// StopOnError is the default error strategy.
	// An operator will stop processing items on error.
	StopOnError OnErrorStrategy = iota
	// ContinueOnError means an operator will continue processing items after an error.
	ContinueOnError
)

type Option

type Option interface {
	// contains filtered or unexported methods
}

Option handles configurable options.

func Serialize

func Serialize(identifier func(interface{}) int) Option

Serialize forces an Observable to make serialized calls and to be well-behaved.

func WithBackPressureStrategy

func WithBackPressureStrategy(strategy BackpressureStrategy) Option

WithBackPressureStrategy sets the back pressure strategy: drop or block.

func WithBufferedChannel

func WithBufferedChannel(capacity int) Option

WithBufferedChannel allows to configure the capacity of a buffered channel.

func WithCPUPool

func WithCPUPool() Option

WithCPUPool allows to specify an execution pool based on the number of logical CPUs.

func WithContext

func WithContext(ctx context.Context) Option

WithContext allows to pass a context.

func WithErrorStrategy

func WithErrorStrategy(strategy OnErrorStrategy) Option

WithErrorStrategy defines how an observable should deal with error. This strategy is propagated to the parent observable.

func WithObservationStrategy

func WithObservationStrategy(strategy ObservationStrategy) Option

WithObservationStrategy uses the eager observation mode meaning consuming the items even without subscription.

func WithPool

func WithPool(pool int) Option

WithPool allows to specify an execution pool.

func WithPublishStrategy

func WithPublishStrategy() Option

WithPublishStrategy converts an ordinary Observable into a connectable Observable.

type OptionalSingle

type OptionalSingle interface {
	Iterable
	Get(opts ...Option) (Item, error)
	Map(apply Func, opts ...Option) OptionalSingle
	Run(opts ...Option) Disposed
}

OptionalSingle is an optional single.

type OptionalSingleImpl

type OptionalSingleImpl struct {
	// contains filtered or unexported fields
}

OptionalSingleImpl implements OptionalSingle.

func (*OptionalSingleImpl) Get

func (o *OptionalSingleImpl) Get(opts ...Option) (Item, error)

Get returns the item or rxgo.OptionalEmpty. The error returned is if the context has been cancelled. This method is blocking.

func (*OptionalSingleImpl) Map

func (o *OptionalSingleImpl) Map(apply Func, opts ...Option) OptionalSingle

Map transforms the items emitted by an OptionalSingle by applying a function to each item.

func (*OptionalSingleImpl) Observe

func (o *OptionalSingleImpl) Observe(opts ...Option) <-chan Item

Observe observes an OptionalSingle by returning its channel.

func (*OptionalSingleImpl) Run

func (o *OptionalSingleImpl) Run(opts ...Option) Disposed

Run creates an observer without consuming the emitted items.

type Predicate

type Predicate func(interface{}) bool

Predicate defines a func that returns a bool from an input value.

type Producer

type Producer func(ctx context.Context, next chan<- Item)

Producer defines a producer implementation.

type RxAssert

type RxAssert interface {
	// contains filtered or unexported methods
}

RxAssert lists the Observable assertions.

func CustomPredicate

func CustomPredicate(predicate AssertPredicate) RxAssert

CustomPredicate checks a custom predicate.

func HasAnError

func HasAnError() RxAssert

HasAnError checks that the observable has produce an error.

func HasError

func HasError(err error) RxAssert

HasError checks that the observable has produce a specific error.

func HasErrors

func HasErrors(errs ...error) RxAssert

HasErrors checks that the observable has produce a set of errors.

func HasItem

func HasItem(i interface{}) RxAssert

HasItem checks if a single or optional single has a specific item.

func HasItems

func HasItems(items ...interface{}) RxAssert

HasItems checks that the observable produces the corresponding items.

func HasItemsNoOrder

func HasItemsNoOrder(items ...interface{}) RxAssert

HasItemsNoOrder checks that an observable produces the corresponding items regardless of the order.

func HasNoError

func HasNoError() RxAssert

HasNoError checks that the observable has not raised any error.

func IsEmpty

func IsEmpty() RxAssert

IsEmpty checks that the observable has not produce any item.

func IsNotEmpty

func IsNotEmpty() RxAssert

IsNotEmpty checks that the observable produces some items.

type Single

type Single interface {
	Iterable
	Filter(apply Predicate, opts ...Option) OptionalSingle
	Get(opts ...Option) (Item, error)
	Map(apply Func, opts ...Option) Single
	Run(opts ...Option) Disposed
}

Single is a observable with a single element.

func JustItem

func JustItem(item interface{}, opts ...Option) Single

JustItem creates a single from one item.

type SingleImpl

type SingleImpl struct {
	// contains filtered or unexported fields
}

SingleImpl implements Single.

func (*SingleImpl) Filter

func (s *SingleImpl) Filter(apply Predicate, opts ...Option) OptionalSingle

Filter emits only those items from an Observable that pass a predicate test.

func (*SingleImpl) Get

func (s *SingleImpl) Get(opts ...Option) (Item, error)

Get returns the item. The error returned is if the context has been cancelled. This method is blocking.

func (*SingleImpl) Map

func (s *SingleImpl) Map(apply Func, opts ...Option) Single

Map transforms the items emitted by a Single by applying a function to each item.

func (*SingleImpl) Observe

func (s *SingleImpl) Observe(opts ...Option) <-chan Item

Observe observes a Single by returning its channel.

func (*SingleImpl) Run

func (s *SingleImpl) Run(opts ...Option) Disposed

Run creates an observer without consuming the emitted items.

type Supplier

type Supplier func(ctx context.Context) Item

Supplier defines a function that supplies a result from nothing.

type TimestampItem

type TimestampItem struct {
	Timestamp time.Time
	V         interface{}
}

TimestampItem attach a timestamp to an item.

type Unmarshaller

type Unmarshaller func([]byte, interface{}) error

Unmarshaller defines an unmarshaller type ([]byte to interface).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL