observable

package
Version: v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 23, 2021 License: MIT Imports: 8 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var DefaultObservable = make(Observable)

Functions

func CheckEventHandler

func CheckEventHandler(handler rx.EventHandler) observer.Observer

CheckHandler checks the underlying type of an EventHandler.

Types

type Observable

type Observable <-chan interface{}

Observable is a basic observable channel

func Create

func Create(source func(emitter *observer.Observer, disposed bool)) Observable

Creates observable from based on source function. Keep it mind to call emitter.OnDone() to signal sequence's end. Example: - emitting none elements observable.Create(emitter *observer.Observer, disposed bool) { emitter.OnDone() }) - emitting one element

observable.Create(func(emitter *observer.Observer, disposed bool) {
		emitter.OnNext("one element")
		emitter.OnDone()
})

func Empty

func Empty() Observable

Empty creates an Observable with no item and terminate immediately.

func From

func From(it rx.Iterator) Observable

From creates a new Observable from an Iterator.

func Interval

func Interval(term chan struct{}, interval time.Duration) Observable

Interval creates an Observable emitting incremental integers infinitely between each given time interval.

func Just

func Just(item interface{}, items ...interface{}) Observable

Just creates an Observable with the provided item(s).

func New

func New(buffer uint) Observable

New creates an Observable

func Range

func Range(start, end int) Observable

Range creates an Observable that emits a particular range of sequential integers.

func Repeat

func Repeat(item interface{}, ntimes ...int) Observable

Repeat creates an Observable emitting a given item repeatedly

func Start

func Start(f fx.EmittableFunc, fs ...fx.EmittableFunc) Observable

Start creates an Observable from one or more directive-like EmittableFunc and emits the result of each operation asynchronously on a new Observable.

func (Observable) Distinct

func (o Observable) Distinct(apply fx.KeySelectorFunc) Observable

Distinct suppresses duplicate items in the original Observable and returns a new Observable.

func (Observable) DistinctUntilChanged

func (o Observable) DistinctUntilChanged(apply fx.KeySelectorFunc) Observable

DistinctUntilChanged suppresses consecutive duplicate items in the original Observable and returns a new Observable.

func (Observable) Filter

func (o Observable) Filter(apply fx.FilterableFunc) Observable

Filter filters items in the original Observable and returns a new Observable with the filtered items.

func (Observable) First

func (o Observable) First() Observable

First returns new Observable which emit only first item.

func (Observable) FlatMap

func (o Observable) FlatMap(apply func(interface{}) Observable, maxInParallel uint) Observable

transforms emitted items into observables and flattens them into single observable. maxInParallel argument controls how many transformed observables are processed in parallel For an example please take a look at flatmap_slice_test.go file in the examples directory.

func (Observable) Last

func (o Observable) Last() Observable

Last returns a new Observable which emit only last item.

func (Observable) Map

func (o Observable) Map(apply fx.MappableFunc) Observable

Map maps a MappableFunc predicate to each item in Observable and returns a new Observable with applied items.

func (Observable) Next

func (o Observable) Next() (interface{}, error)

Next returns the next item on the Observable.

func (Observable) Scan

func (o Observable) Scan(apply fx.ScannableFunc) Observable

Scan applies ScannableFunc predicate to each item in the original Observable sequentially and emits each successive value on a new Observable.

func (Observable) Skip

func (o Observable) Skip(nth uint) Observable

Skip suppresses the first n items in the original Observable and returns a new Observable with the rest items.

func (Observable) SkipLast

func (o Observable) SkipLast(nth uint) Observable

SkipLast suppresses the last n items in the original Observable and returns a new Observable with the rest items.

func (Observable) Subscribe

func (o Observable) Subscribe(handler rx.EventHandler, opts ...Option) <-chan subscription.Subscription

Subscribe subscribes an EventHandler and returns a Subscription channel.

func (Observable) Take

func (o Observable) Take(nth uint) Observable

Take takes first n items in the original Obserable and returns a new Observable with the taken items.

func (Observable) TakeLast

func (o Observable) TakeLast(nth uint) Observable

TakeLast takes last n items in the original Observable and returns a new Observable with the taken items.

type Option

type Option interface {
	// contains filtered or unexported methods
}

Option is the configuration of an observable

func WithParallelism

func WithParallelism(parallelism int) Option

WithParallelism allows to configure the level of parallelism

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL