Documentation
¶
Overview ¶
Package rx provides Reactive Extensions for Go, an API for asynchronous programming with observable streams.
Index ¶
- Constants
- func MakeObserverObservable(age time.Duration, length int, capacity ...int) (Observer, Observable)
- type BoolObserver
- type Canceled
- type Complete
- type Connectable
- type Error
- type IntObserver
- type Multicaster
- type Next
- type Observable
- func Concat(observables ...Observable) Observable
- func Create(create func(Next, Error, Complete, Canceled)) Observable
- func CreateFutureRecursive(timeout time.Duration, create func(Next, Error, Complete) time.Duration) Observable
- func CreateRecursive(create func(Next, Error, Complete)) Observable
- func Defer(factory func() Observable) Observable
- func Empty() Observable
- func From(slice ...interface{}) Observable
- func FromChan(ch <-chan interface{}) Observable
- func Interval(interval time.Duration) Observable
- func Just(element interface{}) Observable
- func Merge(observables ...Observable) Observable
- func MergeDelayError(observables ...Observable) Observable
- func Never() Observable
- func Of(slice ...interface{}) Observable
- func Range(start, count int) Observable
- func Start(f func() (interface{}, error)) Observable
- func Throw(err error) Observable
- func Timer(initialDelay time.Duration, intervals ...time.Duration) Observable
- func (o Observable) All(predicate func(next interface{}) bool) ObservableBool
- func (o Observable) AsObservable() Observable
- func (o Observable) AsObservableBool() ObservableBool
- func (o Observable) AsObservableInt() ObservableInt
- func (o Observable) AsObservableSlice() ObservableSlice
- func (o Observable) AuditTime(duration time.Duration) Observable
- func (o Observable) AutoUnsubscribe() Observable
- func (o Observable) Buffer(closingNotifier Observable) ObservableSlice
- func (o Observable) BufferTime(period time.Duration) ObservableSlice
- func (o Observable) Catch(catch Observable) Observable
- func (o Observable) CatchError(selector func(err error, caught Observable) Observable) Observable
- func (o Observable) CombineLatestMap(project func(interface{}) Observable) ObservableSlice
- func (o Observable) CombineLatestMapTo(inner Observable) ObservableSlice
- func (o Observable) CombineLatestWith(other ...Observable) ObservableSlice
- func (o Observable) ConcatMap(project func(interface{}) Observable) Observable
- func (o Observable) ConcatWith(other ...Observable) Observable
- func (o Observable) Count() ObservableInt
- func (o Observable) DebounceTime(duration time.Duration) Observable
- func (o Observable) Delay(duration time.Duration) Observable
- func (o Observable) Distinct() Observable
- func (o Observable) DistinctUntilChanged(equal ...func(interface{}, interface{}) bool) Observable
- func (o Observable) Do(f func(next interface{})) Observable
- func (o Observable) DoOnComplete(f func()) Observable
- func (o Observable) DoOnError(f func(err error)) Observable
- func (o Observable) ElementAt(n int) Observable
- func (o Observable) Filter(predicate func(next interface{}) bool) Observable
- func (o Observable) Finally(f func()) Observable
- func (o Observable) First() Observable
- func (o Observable) IgnoreCompletion() Observable
- func (o Observable) IgnoreElements() Observable
- func (o Observable) Last() Observable
- func (o Observable) Map(project func(interface{}) interface{}) Observable
- func (o Observable) MapObservable(project func(interface{}) Observable) ObservableObservable
- func (o Observable) MapTo(value interface{}) Observable
- func (o Observable) MergeDelayErrorWith(other ...Observable) Observable
- func (o Observable) MergeMap(project func(interface{}) Observable) Observable
- func (o Observable) MergeWith(other ...Observable) Observable
- func (o Observable) Multicast(factory func() Subject) Multicaster
- func (o Observable) ObserveOn(dispatch func(task func())) Observable
- func (o Observable) OnlyBool() ObservableBool
- func (o Observable) OnlyInt() ObservableInt
- func (o Observable) Println(a ...interface{}) error
- func (o Observable) Publish() Multicaster
- func (o Observable) PublishReplay(bufferCapacity int, windowDuration time.Duration) Multicaster
- func (o Observable) Reduce(reducer func(interface{}, interface{}) interface{}, seed interface{}) Observable
- func (o Observable) Repeat(count int) Observable
- func (o Observable) Retry(count ...int) Observable
- func (o Observable) SampleTime(window time.Duration) Observable
- func (o Observable) Scan(accumulator func(interface{}, interface{}) interface{}, seed interface{}) Observable
- func (o Observable) Serialize() Observable
- func (o Observable) Single() Observable
- func (o Observable) Skip(n int) Observable
- func (o Observable) SkipLast(n int) Observable
- func (o Observable) StartWith(values ...interface{}) Observable
- func (o Observable) Subscribe(observe Observer, schedulers ...Scheduler) Subscription
- func (o Observable) SubscribeOn(scheduler Scheduler) Observable
- func (o Observable) SwitchMap(project func(interface{}) Observable) Observable
- func (o Observable) Take(n int) Observable
- func (o Observable) TakeLast(n int) Observable
- func (o Observable) TakeUntil(other Observable) Observable
- func (o Observable) TakeWhile(condition func(next interface{}) bool) Observable
- func (o Observable) ThrottleTime(duration time.Duration) Observable
- func (o Observable) TimeInterval() ObservableTimeInterval
- func (o Observable) Timeout(due time.Duration) Observable
- func (o Observable) Timestamp() ObservableTimestamp
- func (o Observable) ToChan(subscribers ...Subscriber) <-chan interface{}
- func (o Observable) ToSingle() (entry interface{}, err error)
- func (o Observable) ToSlice() (slice []interface{}, err error)
- func (o Observable) Wait() error
- func (o Observable) WithLatestFrom(other ...Observable) ObservableSlice
- type ObservableBool
- func (o ObservableBool) AsObservable() Observable
- func (o ObservableBool) Println(a ...interface{}) error
- func (o ObservableBool) Single() ObservableBool
- func (o ObservableBool) Subscribe(observe BoolObserver, schedulers ...Scheduler) Subscription
- func (o ObservableBool) ToChan(subscribers ...Subscriber) <-chan bool
- func (o ObservableBool) ToSingle() (entry bool, err error)
- func (o ObservableBool) ToSlice() (slice []bool, err error)
- func (o ObservableBool) Wait() error
- type ObservableInt
- func (o ObservableInt) AsObservable() Observable
- func (o ObservableInt) Average() ObservableInt
- func (o ObservableInt) MapObservable(project func(int) Observable) ObservableObservable
- func (o ObservableInt) Max() ObservableInt
- func (o ObservableInt) Min() ObservableInt
- func (o ObservableInt) Println(a ...interface{}) error
- func (o ObservableInt) Single() ObservableInt
- func (o ObservableInt) Subscribe(observe IntObserver, schedulers ...Scheduler) Subscription
- func (o ObservableInt) Sum() ObservableInt
- func (o ObservableInt) Take(n int) ObservableInt
- func (o ObservableInt) ToChan(subscribers ...Subscriber) <-chan int
- func (o ObservableInt) ToSingle() (entry int, err error)
- func (o ObservableInt) ToSlice() (slice []int, err error)
- func (o ObservableInt) Wait() error
- type ObservableObservable
- func (o ObservableObservable) AutoUnsubscribe() ObservableObservable
- func (o ObservableObservable) CombineLatestAll() ObservableSlice
- func (o ObservableObservable) ConcatAll() Observable
- func (o ObservableObservable) MergeAll() Observable
- func (o ObservableObservable) SwitchAll() Observable
- func (o ObservableObservable) WithLatestFromAll() ObservableSlice
- type ObservableObserver
- type ObservableSlice
- func (o ObservableSlice) AsObservable() Observable
- func (o ObservableSlice) Println(a ...interface{}) error
- func (o ObservableSlice) Single() ObservableSlice
- func (o ObservableSlice) Subscribe(observe SliceObserver, schedulers ...Scheduler) Subscription
- func (o ObservableSlice) ToChan(subscribers ...Subscriber) <-chan Slice
- func (o ObservableSlice) ToSingle() (entry Slice, err error)
- func (o ObservableSlice) ToSlice() (slice []Slice, err error)
- func (o ObservableSlice) Wait() error
- type ObservableTime
- type ObservableTimeInterval
- type ObservableTimestamp
- type Observer
- type RxError
- type Scheduler
- type Slice
- type SliceObserver
- type Subject
- type Subscriber
- type Subscription
- type Time
- type TimeInterval
- type TimeIntervalObserver
- type TimeObserver
- type Timestamp
- type TimestampObserver
Examples ¶
- Defer
- From
- From (Slice)
- FromChan
- Merge
- Observable (MergeDelayError)
- Observable.All
- Observable.Buffer
- Observable.BufferTime
- Observable.Catch
- Observable.CatchError
- Observable.ConcatWith
- Observable.Distinct
- Observable.DistinctUntilChanged
- Observable.Do
- Observable.Filter
- Observable.Map
- Observable.MergeDelayErrorWith
- Observable.MergeMap
- Observable.MergeWith
- Observable.Scan
- Observable.StartWith
- Observable.SubscribeOn (Goroutine)
- Observable.SubscribeOn (Serial)
- Observable.WithLatestFrom
- ObservableObservable.SwitchAll
- ObservableObservable.WithLatestFromAll
Constants ¶
const ( AlreadyDone = RxError("already done") AlreadySubscribed = RxError("already subscribed") AlreadyWaiting = RxError("already waiting") RecursionNotAllowed = RxError("recursion not allowed") StateTransitionFailed = RxError("state transition faled") )
const DefaultReplayCapacity = 16383
DefaultReplayCapacity is the default capacity of a replay buffer when a bufferCapacity of 0 is passed to the NewReplaySubject function.
const DidNotEmitValue = RxError("expected one value, got none")
const EmittedMultipleValues = RxError("expected one value, got multiple")
const ErrUnsubscribed = RxError("subscriber unsubscribed")
Unsubscribed is the error returned by wait when the Unsubscribe method is called on the subscription.
const InvalidCount = RxError("invalid count")
const OutOfSubscriptions = RxError("out of subscriptions")
const TimeoutOccured = RxError("timeout occured")
TimeoutOccured is delivered to an observer if the stream times out.
const TypecastFailed = RxError("typecast failed")
ErrTypecast is delivered to an observer if the generic value cannot be typecast to a specific type.
Variables ¶
This section is empty.
Functions ¶
func MakeObserverObservable ¶
MakeObserverObservable turns an observer into a multicasting and buffering observable. Both the observer and the obeservable are returned. These are then used as the core of any Subject implementation. The Observer side is used to pass items into the buffering multicaster. This then multicasts the items to every Observer that subscribes to the returned Observable.
age age below which items are kept to replay to a new subscriber. length length of the item buffer, number of items kept to replay to a new subscriber. [cap] Capacity of the item buffer, number of items that can be observed before blocking. [scap] Capacity of the subscription list, max number of simultaneous subscribers.
Types ¶
type BoolObserver ¶
BoolObserver is a function that gets called whenever the Observable has something to report. The next argument is the item value that is only valid when the done argument is false. When done is true and the err argument is not nil, then the Observable has terminated with an error. When done is true and the err argument is nil, then the Observable has completed normally.
type Canceled ¶
type Canceled func() bool
Canceled returns true when the observer has unsubscribed.
type Connectable ¶
type Connectable func(Scheduler, Subscriber)
Connectable provides the Connect method for a Multicaster.
func (Connectable) Connect ¶
func (c Connectable) Connect(schedulers ...Scheduler) Subscription
Connect instructs a multicaster to subscribe to its source and begin multicasting items to its subscribers. Connect accepts an optional scheduler argument.
type IntObserver ¶
IntObserver is a function that gets called whenever the Observable has something to report. The next argument is the item value that is only valid when the done argument is false. When done is true and the err argument is not nil, then the Observable has terminated with an error. When done is true and the err argument is nil, then the Observable has completed normally.
type Multicaster ¶
type Multicaster struct {
Observable
Connectable
}
Multicaster is a multicasting connectable observable. One or more Observers can subscribe to it simultaneously. It will subscribe to the source Observable when Connect is called. After that, every emission from the source is multcast to all subscribed Observers.
func (Multicaster) AutoConnect ¶
func (o Multicaster) AutoConnect(count int) Observable
AutoConnect makes a Multicaster behave like an ordinary Observable that automatically connects the multicaster to its source when the specified number of observers have subscribed to it. If the count is less than 1 it will return a Throw(InvalidCount). After connecting, when the number of subscribed observers eventually drops to 0, AutoConnect will cancel the source connection if it hasn't terminated yet. When subsequently the next observer subscribes, AutoConnect will connect to the source only when it was previously canceled or because the source terminated with an error. So it will not reconnect when the source completed succesfully. This specific behavior allows for implementing a caching observable that can be retried until it succeeds. Another thing to notice is that AutoConnect will disconnect an active connection when the number of observers drops to zero. The reason for this is that not doing so would leak a task and leave it hanging in the scheduler.
func (Multicaster) RefCount ¶
func (o Multicaster) RefCount() Observable
RefCount makes a Multicaster behave like an ordinary Observable. On first Subscribe it will call Connect on its Multicaster and when its last subscriber is Unsubscribed it will cancel the source connection by calling Unsubscribe on the subscription returned by the call to Connect.
type Next ¶
type Next func(interface{})
Next can be called to emit the next value to the IntObserver.
type Observable ¶
type Observable func(Observer, Scheduler, Subscriber)
Observable is a function taking an Observer, Scheduler and Subscriber. Calling it will subscribe the Observer to events from the Observable.
Example (MergeDelayError) ¶
package main
import (
"fmt"
"time"
"github.com/reactivego/rx"
)
func main() {
type any = interface{}
const ms = time.Millisecond
AddMul := func(add, mul int) func(any) any {
return func(i any) any {
return mul * (i.(int) + add)
}
}
To := func(to int) func(any) any {
return func(any) any {
return to
}
}
a := rx.Interval(20 * ms).Map(AddMul(1, 20)).Take(4).ConcatWith(rx.Throw(rx.RxError("boom")))
b := rx.Timer(70*ms, 20*ms).Map(To(1)).Take(2)
err := rx.MergeDelayError(a, b).Println()
fmt.Println(err)
}
Output: 20 40 60 1 80 1 boom
func Concat ¶
func Concat(observables ...Observable) Observable
Concat emits the emissions from two or more Observables without interleaving them.
func Create ¶
func Create(create func(Next, Error, Complete, Canceled)) Observable
Create provides a way of creating an Observable from scratch by calling observer methods programmatically.
The create function provided to Create will be called once to implement the observable. It is provided with a Next, Error, Complete and Canceled function that can be called by the code that implements the Observable.
func CreateFutureRecursive ¶
func CreateFutureRecursive(timeout time.Duration, create func(Next, Error, Complete) time.Duration) Observable
CreateFutureRecursive provides a way of creating an Observable from scratch by calling observer methods programmatically.
The create function provided to CreateFutureRecursive will be called repeatedly to implement the observable. It is provided with a Next, Error and Complete function that can be called by the code that implements the Observable.
The timeout passed in determines the time before calling the create function. The time.Duration returned by the create function determines how long CreateFutureRecursive has to wait before calling the create function again.
func CreateRecursive ¶
func CreateRecursive(create func(Next, Error, Complete)) Observable
CreateRecursive provides a way of creating an Observable from scratch by calling observer methods programmatically.
The create function provided to CreateRecursive will be called repeatedly to implement the observable. It is provided with a Next, Error and Complete function that can be called by the code that implements the Observable.
func Defer ¶
func Defer(factory func() Observable) Observable
Defer does not create the Observable until the observer subscribes. It creates a fresh Observable for each subscribing observer. Use it to create observables that maintain separate state per subscription.
Example ¶
package main
import (
"fmt"
"github.com/reactivego/rx"
)
func main() {
count := 0
source := rx.Defer(func() rx.Observable {
return rx.From(count)
})
mapped := source.Map(func(next interface{}) interface{} {
return fmt.Sprintf("observable %d", next)
})
mapped.Println()
count = 123
mapped.Println()
count = 456
mapped.Println()
}
Output: observable 0 observable 123 observable 456
func Empty ¶
func Empty() Observable
Empty creates an Observable that emits no items but terminates normally.
func From ¶
func From(slice ...interface{}) Observable
From creates an Observable from multiple interface{} values passed in.
Example ¶
package main
import (
"github.com/reactivego/rx"
)
func main() {
rx.From(1, 2, 3, 4, 5).Println()
}
Output: 1 2 3 4 5
Example (Slice) ¶
package main
import (
"github.com/reactivego/rx"
)
func main() {
rx.From([]interface{}{1, 2, 3, 4, 5}...).Println()
}
Output: 1 2 3 4 5
func FromChan ¶
func FromChan(ch <-chan interface{}) Observable
FromChan creates an Observable from a Go channel of interface{} values. This allows the code feeding into the channel to send either an error or the next value. The feeding code can send nil or more items and then closing the channel will be seen as completion. When the feeding code sends an error into the channel, it should close the channel immediately to indicate termination with error.
Example ¶
package main
import (
"github.com/reactivego/rx"
)
func main() {
ch := make(chan interface{}, 6)
for i := 0; i < 5; i++ {
ch <- i + 1
}
close(ch)
rx.FromChan(ch).Println()
}
Output: 1 2 3 4 5
func Interval ¶
func Interval(interval time.Duration) Observable
Interval creates an Observable that emits a sequence of integers spaced by a particular time interval. First integer is not emitted immediately, but only after the first time interval has passed. The generated code will do a type conversion from int to interface{}.
func Just ¶
func Just(element interface{}) Observable
Just creates an Observable that emits a particular item.
func Merge ¶
func Merge(observables ...Observable) Observable
Merge combines multiple Observables into one by merging their emissions. An error from any of the observables will terminate the merged observables.
Example ¶
package main
import (
"github.com/reactivego/rx"
)
func main() {
a := rx.From(0, 2, 4)
b := rx.From(1, 3, 5)
rx.Merge(a, b).Println()
}
Output: 0 1 2 3 4 5
func MergeDelayError ¶
func MergeDelayError(observables ...Observable) Observable
MergeDelayError combines multiple Observables into one by merging their emissions. Any error will be deferred until all observables terminate.
func Never ¶
func Never() Observable
Never creates an Observable that emits no items and does't terminate.
func Of ¶
func Of(slice ...interface{}) Observable
Of emits a variable amount of values in a sequence and then emits a complete notification.
func Range ¶
func Range(start, count int) Observable
Range creates an Observable that emits a range of sequential int values. The generated code will do a type conversion from int to interface{}.
func Start ¶
func Start(f func() (interface{}, error)) Observable
Start creates an Observable that emits the return value of a function. It is designed to be used with a function that returns a (interface{}, error) tuple. If the error is non-nil the returned Observable will be an Observable that emits and error, otherwise it will be a single-value Observable of the value.
func Throw ¶
func Throw(err error) Observable
Throw creates an Observable that emits no items and terminates with an error.
func Timer ¶
func Timer(initialDelay time.Duration, intervals ...time.Duration) Observable
Timer creates an Observable that emits a sequence of integers (starting at zero) after an initialDelay has passed. Subsequent values are emitted using a schedule of intervals passed in. If only the initialDelay is given, Timer will emit only once.
func (Observable) All ¶
func (o Observable) All(predicate func(next interface{}) bool) ObservableBool
All determines whether all items emitted by an Observable meet some criteria.
Pass a predicate function to the All operator that accepts an item emitted by the source Observable and returns a boolean value based on an evaluation of that item. All returns an ObservableBool that emits a single boolean value: true if and only if the source Observable terminates normally and every item emitted by the source Observable evaluated as true according to this predicate; false if any item emitted by the source Observable evaluates as false according to this predicate.
Example ¶
package main
import (
"fmt"
"github.com/reactivego/rx"
)
func main() {
// Setup All to produce true only when all source values are less than 5
lessthan5 := func(i interface{}) bool {
return i.(int) < 5
}
result, err := rx.From(1, 2, 5, 2, 1).All(lessthan5).ToSingle()
fmt.Println("All values less than 5?", result, err)
result, err = rx.From(4, 1, 0, -1, 2, 3, 4).All(lessthan5).ToSingle()
fmt.Println("All values less than 5?", result, err)
}
Output: All values less than 5? false <nil> All values less than 5? true <nil>
func (Observable) AsObservable ¶
func (o Observable) AsObservable() Observable
AsObservable returns the source Observable unchanged. This is a special case needed for internal plumbing.
func (Observable) AsObservableBool ¶
func (o Observable) AsObservableBool() ObservableBool
AsObservableBool turns an Observable of interface{} into an ObservableBool. If during observing a typecast fails, the error ErrTypecastToBool will be emitted.
func (Observable) AsObservableInt ¶
func (o Observable) AsObservableInt() ObservableInt
AsObservableInt turns an Observable of interface{} into an ObservableInt. If during observing a typecast fails, the error ErrTypecastToInt will be emitted.
func (Observable) AsObservableSlice ¶
func (o Observable) AsObservableSlice() ObservableSlice
AsObservableSlice turns an Observable of interface{} into an ObservableSlice. If during observing a typecast fails, the error ErrTypecastToSlice will be emitted.
func (Observable) AuditTime ¶
func (o Observable) AuditTime(duration time.Duration) Observable
AuditTime waits until the source emits and then starts a timer. When the timer expires, AuditTime will emit the last value received from the source during the time period when the timer was active.
func (Observable) AutoUnsubscribe ¶
func (o Observable) AutoUnsubscribe() Observable
AutoUnsubscribe will automatically unsubscribe from the source when it signals it is done. This Operator subscribes to the source Observable using a separate subscriber. When the source observable subsequently signals it is done, the separate subscriber will be Unsubscribed.
func (Observable) Buffer ¶
func (o Observable) Buffer(closingNotifier Observable) ObservableSlice
Buffer buffers the source Observable values until closingNotifier emits.
Example ¶
package main
import (
"time"
"github.com/reactivego/rx"
)
func main() {
const ms = time.Millisecond
source := rx.Timer(0*ms, 100*ms).Take(4).ConcatMap(func(i interface{}) rx.Observable {
switch i.(int) {
case 0:
return rx.From("a", "b")
case 1:
return rx.From("c", "d", "e")
case 3:
return rx.From("f", "g")
}
return rx.Empty()
})
closingNotifier := rx.Interval(100 * ms)
source.Buffer(closingNotifier).Println()
}
Output: [a b] [c d e] [] [f g]
func (Observable) BufferTime ¶
func (o Observable) BufferTime(period time.Duration) ObservableSlice
BufferTime buffers the source Observable values for a specific time period and emits those as a slice periodically in time.
Example ¶
package main
import (
"time"
"github.com/reactivego/rx"
)
func main() {
const ms = time.Millisecond
source := rx.Timer(0*ms, 100*ms).Take(4).ConcatMap(func(i interface{}) rx.Observable {
switch i.(int) {
case 0:
return rx.From("a", "b")
case 1:
return rx.From("c", "d", "e")
case 3:
return rx.From("f", "g")
}
return rx.Empty()
})
source.BufferTime(100 * ms).Println()
}
Output: [a b] [c d e] [] [f g]
func (Observable) Catch ¶
func (o Observable) Catch(catch Observable) Observable
Catch recovers from an error notification by continuing the sequence without emitting the error but by switching to the catch Observable to provide items.
Example ¶
package main
import (
"github.com/reactivego/rx"
)
func main() {
const problem = rx.RxError("problem")
rx.From(1, 2, 3).ConcatWith(rx.Throw(problem)).Catch(rx.From(4, 5)).Println()
}
Output: 1 2 3 4 5
func (Observable) CatchError ¶
func (o Observable) CatchError(selector func(err error, caught Observable) Observable) Observable
CatchError catches errors on the Observable to be handled by returning a new Observable or throwing an error. It is passed a selector function that takes as arguments err, which is the error, and caught, which is the source observable, in case you'd like to "retry" that observable by returning it again. Whatever observable is returned by the selector will be used to continue the observable chain.
Example ¶
package main
import (
"github.com/reactivego/rx"
)
func main() {
const problem = rx.RxError("problem")
catcher := func(err error, caught rx.Observable) rx.Observable {
if err == problem {
return rx.From(4, 5)
} else {
return caught
}
}
rx.From(1, 2, 3).ConcatWith(rx.Throw(problem)).CatchError(catcher).Println()
}
Output: 1 2 3 4 5
func (Observable) CombineLatestMap ¶
func (o Observable) CombineLatestMap(project func(interface{}) Observable) ObservableSlice
CombinesLatestMap maps every entry emitted by the Observable into an Observable, and then subscribe to it, until the source observable completes. It will then wait for all of the Observables to emit before emitting the first slice. Whenever any of the subscribed observables emits, a new slice will be emitted containing all the latest value.
func (Observable) CombineLatestMapTo ¶
func (o Observable) CombineLatestMapTo(inner Observable) ObservableSlice
CombinesLatestMapTo maps every entry emitted by the Observable into a single Observable, and then subscribe to it, until the source observable completes. It will then wait for all of the Observables to emit before emitting the first slice. Whenever any of the subscribed observables emits, a new slice will be emitted containing all the latest value.
func (Observable) CombineLatestWith ¶
func (o Observable) CombineLatestWith(other ...Observable) ObservableSlice
CombineLatestWith will subscribe to its Observable and all other Observables passed in. It will then wait for all of the ObservableBars to emit before emitting the first slice. Whenever any of the subscribed observables emits, a new slice will be emitted containing all the latest value.
func (Observable) ConcatMap ¶
func (o Observable) ConcatMap(project func(interface{}) Observable) Observable
ConcatMap transforms the items emitted by an Observable by applying a function to each item and returning an Observable. The stream of Observable items is then flattened by concattenating the emissions from the observables without interleaving.
func (Observable) ConcatWith ¶
func (o Observable) ConcatWith(other ...Observable) Observable
ConcatWith emits the emissions from two or more Observables without interleaving them.
Example ¶
package main
import (
"fmt"
"github.com/reactivego/rx"
)
func main() {
oa := rx.From(0, 1, 2, 3)
ob := rx.From(4, 5)
oc := rx.From(6)
od := rx.From(7, 8, 9)
oa.ConcatWith(ob, oc).ConcatWith(od).Subscribe(func(next interface{}, err error, done bool) {
switch {
case !done:
fmt.Printf("%d,", next.(int))
case err != nil:
fmt.Print("err", err)
default:
fmt.Printf("complete")
}
}).Wait()
}
Output: 0,1,2,3,4,5,6,7,8,9,complete
func (Observable) Count ¶
func (o Observable) Count() ObservableInt
Count counts the number of items emitted by the source Observable and emits only this value.
func (Observable) DebounceTime ¶
func (o Observable) DebounceTime(duration time.Duration) Observable
DebounceTime only emits the last item of a burst from an Observable if a particular timespan has passed without it emitting another item.
func (Observable) Delay ¶
func (o Observable) Delay(duration time.Duration) Observable
Delay shifts an emission from an Observable forward in time by a particular amount of time. The relative time intervals between emissions are preserved.
func (Observable) Distinct ¶
func (o Observable) Distinct() Observable
Distinct suppress duplicate items emitted by an Observable
Example ¶
package main
import (
"github.com/reactivego/rx"
)
func main() {
rx.From(1, 2, 2, 1, 3).Distinct().Println()
}
Output: 1 2 3
func (Observable) DistinctUntilChanged ¶
func (o Observable) DistinctUntilChanged(equal ...func(interface{}, interface{}) bool) Observable
DistinctUntilChanged only emits when the current value is different from the last.
Example ¶
package main
import (
"github.com/reactivego/rx"
)
func main() {
rx.From(1, 2, 2, 1, 3).DistinctUntilChanged().Println()
}
Output: 1 2 1 3
func (Observable) Do ¶
func (o Observable) Do(f func(next interface{})) Observable
Do calls a function for each next value passing through the observable.
Example ¶
package main
import (
"fmt"
"github.com/reactivego/rx"
)
func main() {
rx.From(1, 2, 3).Do(func(v interface{}) {
fmt.Println(v.(int))
}).Wait()
}
Output: 1 2 3
func (Observable) DoOnComplete ¶
func (o Observable) DoOnComplete(f func()) Observable
DoOnComplete calls a function when the stream completes.
func (Observable) DoOnError ¶
func (o Observable) DoOnError(f func(err error)) Observable
DoOnError calls a function for any error on the stream.
func (Observable) ElementAt ¶
func (o Observable) ElementAt(n int) Observable
ElementAt emit only item n emitted by an Observable
func (Observable) Filter ¶
func (o Observable) Filter(predicate func(next interface{}) bool) Observable
Filter emits only those items from an Observable that pass a predicate test.
Example ¶
package main
import (
"github.com/reactivego/rx"
)
func main() {
even := func(i interface{}) bool {
return i.(int)%2 == 0
}
rx.From(1, 2, 3, 4, 5, 6, 7, 8).Filter(even).Println()
}
Output: 2 4 6 8
func (Observable) Finally ¶
func (o Observable) Finally(f func()) Observable
Finally applies a function for any error or completion on the stream. This doesn't expose whether this was an error or a completion.
func (Observable) First ¶
func (o Observable) First() Observable
First emits only the first item, or the first item that meets a condition, from an Observable.
func (Observable) IgnoreCompletion ¶
func (o Observable) IgnoreCompletion() Observable
IgnoreCompletion only emits items and never completes, neither with Error nor with Complete.
func (Observable) IgnoreElements ¶
func (o Observable) IgnoreElements() Observable
IgnoreElements does not emit any items from an Observable but mirrors its termination notification.
func (Observable) Last ¶
func (o Observable) Last() Observable
Last emits only the last item emitted by an Observable.
func (Observable) Map ¶
func (o Observable) Map(project func(interface{}) interface{}) Observable
Map transforms the items emitted by an Observable by applying a function to each item.
Example ¶
package main
import (
"fmt"
"github.com/reactivego/rx"
)
func main() {
rx.From(1, 2, 3, 4).Map(func(i interface{}) interface{} {
return fmt.Sprintf("%d!", i.(int))
}).Println()
}
Output: 1! 2! 3! 4!
func (Observable) MapObservable ¶
func (o Observable) MapObservable(project func(interface{}) Observable) ObservableObservable
MapObservable transforms the items emitted by an Observable by applying a function to each item.
func (Observable) MapTo ¶
func (o Observable) MapTo(value interface{}) Observable
MapTo transforms the items emitted by an Observable. Emitted values are mapped to the same value every time.
func (Observable) MergeDelayErrorWith ¶
func (o Observable) MergeDelayErrorWith(other ...Observable) Observable
MergeDelayError combines multiple Observables into one by merging their emissions. Any error will be deferred until all observables terminate.
Example ¶
package main
import (
"fmt"
"time"
"github.com/reactivego/rx"
)
func main() {
type any = interface{}
const ms = time.Millisecond
AddMul := func(add, mul int) func(any) any {
return func(i any) any {
return mul * (i.(int) + add)
}
}
To := func(to int) func(any) any {
return func(any) any {
return to
}
}
a := rx.Interval(20 * ms).Map(AddMul(1, 20)).Take(4).ConcatWith(rx.Throw(rx.RxError("boom")))
b := rx.Timer(70*ms, 20*ms).Map(To(1)).Take(2)
fmt.Println(a.MergeDelayErrorWith(b).Println())
}
Output: 20 40 60 1 80 1 boom
func (Observable) MergeMap ¶
func (o Observable) MergeMap(project func(interface{}) Observable) Observable
MergeMap transforms the items emitted by an Observable by applying a function to each item an returning an Observable. The stream of Observable items is then merged into a single stream of items using the MergeAll operator.
Example ¶
package main
import (
"github.com/reactivego/rx"
)
func main() {
source := rx.From(1, 2).
MergeMap(func(n interface{}) rx.Observable {
return rx.Range(n.(int), 2)
})
if err := source.Println(); err != nil {
panic(err)
}
}
Output: 1 2 2 3
func (Observable) MergeWith ¶
func (o Observable) MergeWith(other ...Observable) Observable
MergeWith combines multiple Observables into one by merging their emissions. An error from any of the observables will terminate the merged observables.
Example ¶
package main
import (
"github.com/reactivego/rx"
)
func main() {
a := rx.From(0, 2, 4)
b := rx.From(1, 3, 5)
a.MergeWith(b).Println()
}
Output: 0 1 2 3 4 5
func (Observable) Multicast ¶
func (o Observable) Multicast(factory func() Subject) Multicaster
Multicast converts an ordinary observable into a multicasting connectable observable or multicaster for short. A multicaster will only start emitting values after its Connect method has been called. The factory method passed in should return a new Subject that implements the actual multicasting behavior.
func (Observable) ObserveOn ¶
func (o Observable) ObserveOn(dispatch func(task func())) Observable
ObserveOn specifies a dispatch function to use for delivering values to the observer.
func (Observable) OnlyBool ¶
func (o Observable) OnlyBool() ObservableBool
OnlyBool filters the value stream of an Observable of interface{} and outputs only the bool typed values.
func (Observable) OnlyInt ¶
func (o Observable) OnlyInt() ObservableInt
OnlyInt filters the value stream of an Observable of interface{} and outputs only the int typed values.
func (Observable) Println ¶
func (o Observable) Println(a ...interface{}) error
Println subscribes to the Observable and prints every item to os.Stdout while it waits for completion or error. Returns either the error or nil when the Observable completed normally. Println uses a serial scheduler created with NewScheduler().
func (Observable) Publish ¶
func (o Observable) Publish() Multicaster
Publish returns a Multicaster for a Subject to an underlying Observable and turns the subject into a connnectable observable. A Subject emits to an observer only those items that are emitted by the underlying Observable subsequent to the time of the observer subscribes. When the underlying Obervable terminates with an error, then subscribed observers will receive that error. After all observers have unsubscribed due to an error, the Multicaster does an internal reset just before the next observer subscribes. So this Publish operator is re-connectable, unlike the RxJS 5 behavior that isn't. To simulate the RxJS 5 behavior use Publish().AutoConnect(1) this will connect on the first subscription but will never re-connect.
func (Observable) PublishReplay ¶
func (o Observable) PublishReplay(bufferCapacity int, windowDuration time.Duration) Multicaster
PublishReplay returns a Multicaster for a ReplaySubject to an underlying Observable and turns the subject into a connectable observable. A ReplaySubject emits to any observer all of the items that were emitted by the source observable, regardless of when the observer subscribes. When the underlying Obervable terminates with an error, then subscribed observers will receive that error. After all observers have unsubscribed due to an error, the Multicaster does an internal reset just before the next observer subscribes.
func (Observable) Reduce ¶
func (o Observable) Reduce(reducer func(interface{}, interface{}) interface{}, seed interface{}) Observable
Reduce applies a reducer function to each item emitted by an Observable and the previous reducer result. The operator accepts a seed argument that is passed to the reducer for the first item emitted by the Observable. Reduce emits only the final value.
func (Observable) Repeat ¶
func (o Observable) Repeat(count int) Observable
Repeat creates an Observable that emits a sequence of items repeatedly.
func (Observable) Retry ¶
func (o Observable) Retry(count ...int) Observable
Retry if a source Observable sends an error notification, resubscribe to it in the hopes that it will complete without error. If count is zero or negative, the retry count will be effectively infinite. The scheduler passed when subscribing is used by Retry to schedule any retry attempt. The time between retries is 1 millisecond, so retry frequency is 1 kHz. Any SubscribeOn operators should be called after Retry to prevent lockups caused by mixing different schedulers in the same subscription for retrying and subscribing.
func (Observable) SampleTime ¶
func (o Observable) SampleTime(window time.Duration) Observable
SampleTime emits the most recent item emitted by an Observable within periodic time intervals.
func (Observable) Scan ¶
func (o Observable) Scan(accumulator func(interface{}, interface{}) interface{}, seed interface{}) Observable
Scan applies a accumulator function to each item emitted by an Observable and the previous accumulator result. The operator accepts a seed argument that is passed to the accumulator for the first item emitted by the Observable. Scan emits every value, both intermediate and final.
Example ¶
package main
import (
"github.com/reactivego/rx"
)
func main() {
add := func(acc interface{}, value interface{}) interface{} {
return acc.(int) + value.(int)
}
rx.From(1, 2, 3, 4, 5).Scan(add, 0).Println()
}
Output: 1 3 6 10 15
func (Observable) Serialize ¶
func (o Observable) Serialize() Observable
Serialize forces an Observable to make serialized calls and to be well-behaved.
func (Observable) Single ¶
func (o Observable) Single() Observable
Single enforces that the observable sends exactly one data item and then completes. If the observable sends no data before completing or sends more than 1 item before completing, this is reported as an error to the observer.
func (Observable) Skip ¶
func (o Observable) Skip(n int) Observable
Skip suppresses the first n items emitted by an Observable.
func (Observable) SkipLast ¶
func (o Observable) SkipLast(n int) Observable
SkipLast suppresses the last n items emitted by an Observable.
func (Observable) StartWith ¶
func (o Observable) StartWith(values ...interface{}) Observable
StartWith returns an observable that, at the moment of subscription, will synchronously emit all values provided to this operator, then subscribe to the source and mirror all of its emissions to subscribers.
Example ¶
package main
import (
"github.com/reactivego/rx"
)
func main() {
rx.From(2, 3).StartWith(1).Println()
}
Output: 1 2 3
func (Observable) Subscribe ¶
func (o Observable) Subscribe(observe Observer, schedulers ...Scheduler) Subscription
Subscribe operates upon the emissions and notifications from an Observable. This method returns a Subscription. Subscribe uses a serial scheduler created with NewScheduler().
func (Observable) SubscribeOn ¶
func (o Observable) SubscribeOn(scheduler Scheduler) Observable
SubscribeOn specifies the scheduler an Observable should use when it is subscribed to.
Example (Goroutine) ¶
package main
import (
"fmt"
"time"
"github.com/reactivego/rx"
)
func main() {
const ms = time.Millisecond
goroutine := rx.GoroutineScheduler()
observer := func(next interface{}, err error, done bool) {
switch {
case !done:
fmt.Println(goroutine.Count(), "print", next)
case err != nil:
fmt.Println(goroutine.Count(), "print", err)
default:
fmt.Println(goroutine.Count(), "print", "complete")
}
}
fmt.Println(goroutine.Count(), "SUBSCRIBING...")
subscription := rx.From(1, 2, 3).Delay(10 * ms).SubscribeOn(goroutine).Subscribe(observer)
// Note that without a Delay the next Println lands at a random spot in the output.
fmt.Println("WAITING...")
subscription.Wait()
fmt.Println(goroutine.Count(), "DONE")
}
Output: 0 SUBSCRIBING... WAITING... 1 print 1 1 print 2 1 print 3 1 print complete 0 DONE
Example (Serial) ¶
package main
import (
"fmt"
"github.com/reactivego/rx"
)
func main() {
serial := rx.NewScheduler()
observer := func(next interface{}, err error, done bool) {
switch {
case !done:
fmt.Println(serial.Count(), "print", next)
case err != nil:
fmt.Println(serial.Count(), "print", err)
default:
fmt.Println(serial.Count(), "print", "complete")
}
}
fmt.Println(serial.Count(), "SUBSCRIBING...")
subscription := rx.From(1, 2, 3).SubscribeOn(serial).Subscribe(observer)
fmt.Println(serial.Count(), "WAITING...")
subscription.Wait()
fmt.Println(serial.Count(), "DONE")
}
Output: 0 SUBSCRIBING... 1 WAITING... 1 print 1 1 print 2 1 print 3 1 print complete 0 DONE
func (Observable) SwitchMap ¶
func (o Observable) SwitchMap(project func(interface{}) Observable) Observable
SwitchMap transforms the items emitted by an Observable by applying a function to each item an returning an Observable. In doing so, it behaves much like what used to be called FlatMap, except that whenever a new Observable is emitted SwitchMap will unsubscribe from the previous Observable and begin emitting items from the newly emitted one.
func (Observable) Take ¶
func (o Observable) Take(n int) Observable
Take emits only the first n items emitted by an Observable.
func (Observable) TakeLast ¶
func (o Observable) TakeLast(n int) Observable
TakeLast emits only the last n items emitted by an Observable.
func (Observable) TakeUntil ¶
func (o Observable) TakeUntil(other Observable) Observable
TakeUntil emits items emitted by an Observable until another Observable emits an item.
func (Observable) TakeWhile ¶
func (o Observable) TakeWhile(condition func(next interface{}) bool) Observable
TakeWhile mirrors items emitted by an Observable until a specified condition becomes false.
The TakeWhile mirrors the source Observable until such time as some condition you specify becomes false, at which point TakeWhile stops mirroring the source Observable and terminates its own Observable.
func (Observable) ThrottleTime ¶
func (o Observable) ThrottleTime(duration time.Duration) Observable
ThrottleTime emits when the source emits and then starts a timer during which all emissions from the source are ignored. After the timer expires, ThrottleTime will again emit the next item the source emits, and so on.
func (Observable) TimeInterval ¶
func (o Observable) TimeInterval() ObservableTimeInterval
TimeInterval intercepts the items from the source Observable and emits in their place a struct that indicates the amount of time that elapsed between pairs of emissions.
func (Observable) Timeout ¶
func (o Observable) Timeout(due time.Duration) Observable
Timeout mirrors the source Observable, but issues an error notification if a particular period of time elapses without any emitted items. Timeout schedules a task on the scheduler passed to it during subscription.
func (Observable) Timestamp ¶
func (o Observable) Timestamp() ObservableTimestamp
Timestamp attaches a timestamp to each item emitted by an observable indicating when it was emitted.
func (Observable) ToChan ¶
func (o Observable) ToChan(subscribers ...Subscriber) <-chan interface{}
ToChan returns a channel that emits interface{} values. If the source observable does not emit values but emits an error or complete, then the returned channel will enit any error and then close without emitting any values.
ToChan uses the public scheduler.Goroutine variable for scheduling, because it needs the concurrency so the returned channel can be used by used by the calling code directly. To be able to cancel ToChan, you will need to create a subscriber yourself and pass it to ToChan as an argument.
func (Observable) ToSingle ¶
func (o Observable) ToSingle() (entry interface{}, err error)
ToSingle blocks until the Observable emits exactly one value or an error. The value and any error are returned. ToSingle uses a serial scheduler created with NewScheduler().
func (Observable) ToSlice ¶
func (o Observable) ToSlice() (slice []interface{}, err error)
ToSlice collects all values from the Observable into an slice. The complete slice and any error are returned. ToSlice uses a serial scheduler created with NewScheduler().
func (Observable) Wait ¶
func (o Observable) Wait() error
Wait subscribes to the Observable and waits for completion or error. Returns either the error or nil when the Observable completed normally. Wait uses a serial scheduler created with NewScheduler().
func (Observable) WithLatestFrom ¶
func (o Observable) WithLatestFrom(other ...Observable) ObservableSlice
WithLatestFrom will subscribe to all Observables and wait for all of them to emit before emitting the first slice. The source observable determines the rate at which the values are emitted. The idea is that observables that are faster than the source, don't determine the rate at which the resulting observable emits. The observables that are combined with the source will be allowed to continue emitting but only will have their last emitted value emitted whenever the source emits.
Example ¶
package main
import (
"github.com/reactivego/rx"
)
func main() {
a := rx.From(1, 2, 3, 4, 5)
b := rx.From("A", "B", "C", "D", "E")
a.WithLatestFrom(b).Println()
}
Output: [2 A] [3 B] [4 C] [5 D]
type ObservableBool ¶
type ObservableBool func(BoolObserver, Scheduler, Subscriber)
ObservableBool is a function taking an Observer, Scheduler and Subscriber. Calling it will subscribe the Observer to events from the Observable.
func (ObservableBool) AsObservable ¶
func (o ObservableBool) AsObservable() Observable
AsObservable turns a typed ObservableBool into an Observable of interface{}.
func (ObservableBool) Println ¶
func (o ObservableBool) Println(a ...interface{}) error
Println subscribes to the Observable and prints every item to os.Stdout while it waits for completion or error. Returns either the error or nil when the Observable completed normally. Println uses a serial scheduler created with NewScheduler().
func (ObservableBool) Single ¶
func (o ObservableBool) Single() ObservableBool
Single enforces that the observableBool sends exactly one data item and then completes. If the observable sends no data before completing or sends more than 1 item before completing this reported as an error to the observer.
func (ObservableBool) Subscribe ¶
func (o ObservableBool) Subscribe(observe BoolObserver, schedulers ...Scheduler) Subscription
Subscribe operates upon the emissions and notifications from an Observable. This method returns a Subscription. Subscribe uses a serial scheduler created with NewScheduler().
func (ObservableBool) ToChan ¶
func (o ObservableBool) ToChan(subscribers ...Subscriber) <-chan bool
ToChan returns a channel that emits bool values. If the source observable does not emit values but emits an error or complete, then the returned channel will close without emitting any values.
ToChan uses the public scheduler.Goroutine variable for scheduling, because it needs the concurrency so the returned channel can be used by used by the calling code directly. To be able to cancel ToChan, you will need to create a subscriber yourself and pass it to ToChan as an argument.
func (ObservableBool) ToSingle ¶
func (o ObservableBool) ToSingle() (entry bool, err error)
ToSingle blocks until the ObservableBool emits exactly one value or an error. The value and any error are returned. ToSingle uses a serial scheduler created with NewScheduler().
func (ObservableBool) ToSlice ¶
func (o ObservableBool) ToSlice() (slice []bool, err error)
ToSlice collects all values from the ObservableBool into an slice. The complete slice and any error are returned. ToSlice uses a serial scheduler created with NewScheduler().
func (ObservableBool) Wait ¶
func (o ObservableBool) Wait() error
Wait subscribes to the Observable and waits for completion or error. Returns either the error or nil when the Observable completed normally. Wait uses a serial scheduler created with NewScheduler().
type ObservableInt ¶
type ObservableInt func(IntObserver, Scheduler, Subscriber)
ObservableInt is a function taking an Observer, Scheduler and Subscriber. Calling it will subscribe the Observer to events from the Observable.
func (ObservableInt) AsObservable ¶
func (o ObservableInt) AsObservable() Observable
AsObservable turns a typed ObservableInt into an Observable of interface{}.
func (ObservableInt) Average ¶
func (o ObservableInt) Average() ObservableInt
Average calculates the average of numbers emitted by an ObservableInt and emits this average.
func (ObservableInt) MapObservable ¶
func (o ObservableInt) MapObservable(project func(int) Observable) ObservableObservable
MapObservable transforms the items emitted by an ObservableInt by applying a function to each item.
func (ObservableInt) Max ¶
func (o ObservableInt) Max() ObservableInt
Max determines, and emits, the maximum-valued item emitted by an ObservableInt.
func (ObservableInt) Min ¶
func (o ObservableInt) Min() ObservableInt
Min determines, and emits, the minimum-valued item emitted by an ObservableInt.
func (ObservableInt) Println ¶
func (o ObservableInt) Println(a ...interface{}) error
Println subscribes to the Observable and prints every item to os.Stdout while it waits for completion or error. Returns either the error or nil when the Observable completed normally. Println uses a serial scheduler created with NewScheduler().
func (ObservableInt) Single ¶
func (o ObservableInt) Single() ObservableInt
Single enforces that the observableInt sends exactly one data item and then completes. If the observable sends no data before completing or sends more than 1 item before completing this reported as an error to the observer.
func (ObservableInt) Subscribe ¶
func (o ObservableInt) Subscribe(observe IntObserver, schedulers ...Scheduler) Subscription
Subscribe operates upon the emissions and notifications from an Observable. This method returns a Subscription. Subscribe uses a serial scheduler created with NewScheduler().
func (ObservableInt) Sum ¶
func (o ObservableInt) Sum() ObservableInt
Sum calculates the sum of numbers emitted by an ObservableInt and emits this sum.
func (ObservableInt) Take ¶
func (o ObservableInt) Take(n int) ObservableInt
Take emits only the first n items emitted by an ObservableInt.
func (ObservableInt) ToChan ¶
func (o ObservableInt) ToChan(subscribers ...Subscriber) <-chan int
ToChan returns a channel that emits int values. If the source observable does not emit values but emits an error or complete, then the returned channel will close without emitting any values.
ToChan uses the public scheduler.Goroutine variable for scheduling, because it needs the concurrency so the returned channel can be used by used by the calling code directly. To be able to cancel ToChan, you will need to create a subscriber yourself and pass it to ToChan as an argument.
func (ObservableInt) ToSingle ¶
func (o ObservableInt) ToSingle() (entry int, err error)
ToSingle blocks until the ObservableInt emits exactly one value or an error. The value and any error are returned. ToSingle uses a serial scheduler created with NewScheduler().
func (ObservableInt) ToSlice ¶
func (o ObservableInt) ToSlice() (slice []int, err error)
ToSlice collects all values from the ObservableInt into an slice. The complete slice and any error are returned. ToSlice uses a serial scheduler created with NewScheduler().
func (ObservableInt) Wait ¶
func (o ObservableInt) Wait() error
Wait subscribes to the Observable and waits for completion or error. Returns either the error or nil when the Observable completed normally. Wait uses a serial scheduler created with NewScheduler().
type ObservableObservable ¶
type ObservableObservable func(ObservableObserver, Scheduler, Subscriber)
ObservableObservable is a function taking an Observer, Scheduler and Subscriber. Calling it will subscribe the Observer to events from the Observable.
func FromObservable ¶
func FromObservable(slice ...Observable) ObservableObservable
FromObservable creates an ObservableObservable from multiple Observable values passed in.
func (ObservableObservable) AutoUnsubscribe ¶
func (o ObservableObservable) AutoUnsubscribe() ObservableObservable
AutoUnsubscribe will automatically unsubscribe from the source when it signals it is done. This Operator subscribes to the source Observable using a separate subscriber. When the source observable subsequently signals it is done, the separate subscriber will be Unsubscribed.
func (ObservableObservable) CombineLatestAll ¶
func (o ObservableObservable) CombineLatestAll() ObservableSlice
CombineLatestAll flattens a higher order observable (e.g. ObservableObservable) by subscribing to all emitted observables (ie. Observable entries) until the source completes. It will then wait for all of the subscribed Observables to emit before emitting the first slice. Whenever any of the subscribed observables emits, a new slice will be emitted containing all the latest value.
func (ObservableObservable) ConcatAll ¶
func (o ObservableObservable) ConcatAll() Observable
ConcatAll flattens a higher order observable by concattenating the observables it emits.
func (ObservableObservable) MergeAll ¶
func (o ObservableObservable) MergeAll() Observable
MergeAll flattens a higher order observable by merging the observables it emits.
func (ObservableObservable) SwitchAll ¶
func (o ObservableObservable) SwitchAll() Observable
SwitchAll converts an Observable that emits Observables into a single Observable that emits the items emitted by the most-recently-emitted of those Observables.
Example ¶
package main
import (
"time"
"github.com/reactivego/rx"
)
func main() {
type any = interface{}
const ms = time.Millisecond
// toObservable creates a new observable that emits an integer starting after 20ms and then repeated
// every 20ms in the range starting at 0 and incrementing by 1. It takes only the first 10 emitted values.
toObservable := func(i any) rx.Observable {
return rx.Interval(20 * ms).Take(10)
}
rx.Interval(100 * ms).
Take(3).
MapObservable(toObservable).
SwitchAll().
Println()
}
Output: 0 1 2 3 0 1 2 3 0 1 2 3 4 5 6 7 8 9
func (ObservableObservable) WithLatestFromAll ¶
func (o ObservableObservable) WithLatestFromAll() ObservableSlice
WithLatestFromAll flattens a higher order observable (e.g. ObservableObservable) by subscribing to all emitted observables (ie. Observable entries) until the source completes. It will then wait for all of the subscribed Observables to emit before emitting the first slice. The first observable that was emitted by the source will be used as the trigger observable. Whenever the trigger observable emits, a new slice will be emitted containing all the latest values.
Example ¶
package main
import (
"github.com/reactivego/rx"
)
func main() {
a := rx.From(1, 2, 3, 4, 5)
b := rx.From("A", "B", "C", "D", "E")
c := rx.FromObservable(a, b)
c.WithLatestFromAll().Println()
}
Output: [2 A] [3 B] [4 C] [5 D]
type ObservableObserver ¶
type ObservableObserver func(next Observable, err error, done bool)
ObservableObserver is a function that gets called whenever the Observable has something to report. The next argument is the item value that is only valid when the done argument is false. When done is true and the err argument is not nil, then the Observable has terminated with an error. When done is true and the err argument is nil, then the Observable has completed normally.
type ObservableSlice ¶
type ObservableSlice func(SliceObserver, Scheduler, Subscriber)
ObservableSlice is a function taking an Observer, Scheduler and Subscriber. Calling it will subscribe the Observer to events from the Observable.
func CombineLatest ¶
func CombineLatest(observables ...Observable) ObservableSlice
CombineLatest will subscribe to all Observables. It will then wait for all of them to emit before emitting the first slice. Whenever any of the subscribed observables emits, a new slice will be emitted containing all the latest value.
func (ObservableSlice) AsObservable ¶
func (o ObservableSlice) AsObservable() Observable
AsObservable turns a typed ObservableSlice into an Observable of interface{}.
func (ObservableSlice) Println ¶
func (o ObservableSlice) Println(a ...interface{}) error
Println subscribes to the Observable and prints every item to os.Stdout while it waits for completion or error. Returns either the error or nil when the Observable completed normally. Println uses a serial scheduler created with NewScheduler().
func (ObservableSlice) Single ¶
func (o ObservableSlice) Single() ObservableSlice
Single enforces that the observableSlice sends exactly one data item and then completes. If the observable sends no data before completing or sends more than 1 item before completing this reported as an error to the observer.
func (ObservableSlice) Subscribe ¶
func (o ObservableSlice) Subscribe(observe SliceObserver, schedulers ...Scheduler) Subscription
Subscribe operates upon the emissions and notifications from an Observable. This method returns a Subscription. Subscribe uses a serial scheduler created with NewScheduler().
func (ObservableSlice) ToChan ¶
func (o ObservableSlice) ToChan(subscribers ...Subscriber) <-chan Slice
ToChan returns a channel that emits Slice values. If the source observable does not emit values but emits an error or complete, then the returned channel will close without emitting any values.
ToChan uses the public scheduler.Goroutine variable for scheduling, because it needs the concurrency so the returned channel can be used by used by the calling code directly. To be able to cancel ToChan, you will need to create a subscriber yourself and pass it to ToChan as an argument.
func (ObservableSlice) ToSingle ¶
func (o ObservableSlice) ToSingle() (entry Slice, err error)
ToSingle blocks until the ObservableSlice emits exactly one value or an error. The value and any error are returned. ToSingle uses a serial scheduler created with NewScheduler().
func (ObservableSlice) ToSlice ¶
func (o ObservableSlice) ToSlice() (slice []Slice, err error)
ToSlice collects all values from the ObservableSlice into an slice. The complete slice and any error are returned. ToSlice uses a serial scheduler created with NewScheduler().
func (ObservableSlice) Wait ¶
func (o ObservableSlice) Wait() error
Wait subscribes to the Observable and waits for completion or error. Returns either the error or nil when the Observable completed normally. Wait uses a serial scheduler created with NewScheduler().
type ObservableTime ¶
type ObservableTime func(TimeObserver, Scheduler, Subscriber)
ObservableTime is a function taking an Observer, Scheduler and Subscriber. Calling it will subscribe the Observer to events from the Observable.
func Ticker ¶
func Ticker(initialDelay time.Duration, intervals ...time.Duration) ObservableTime
Ticker creates an ObservableTime that emits a sequence of timestamps after an initialDelay has passed. Subsequent timestamps are emitted using a schedule of intervals passed in. If only the initialDelay is given, Ticker will emit only once.
type ObservableTimeInterval ¶
type ObservableTimeInterval func(TimeIntervalObserver, Scheduler, Subscriber)
ObservableTimeInterval is a function taking an Observer, Scheduler and Subscriber. Calling it will subscribe the Observer to events from the Observable.
type ObservableTimestamp ¶
type ObservableTimestamp func(TimestampObserver, Scheduler, Subscriber)
ObservableTimestamp is a function taking an Observer, Scheduler and Subscriber. Calling it will subscribe the Observer to events from the Observable.
type Observer ¶
Observer is a function that gets called whenever the Observable has something to report. The next argument is the item value that is only valid when the done argument is false. When done is true and the err argument is not nil, then the Observable has terminated with an error. When done is true and the err argument is nil, then the Observable has completed normally.
func (Observer) AsObserver ¶
AsObserver converts an observer of interface{} items to an observer of interface{} items.
func (Observer) Complete ¶
func (o Observer) Complete()
Complete is called by an Observable to signal that no more data is forthcoming to the Observer.
type Scheduler ¶
Scheduler is used to schedule tasks to support subscribing and observing.
func GoroutineScheduler ¶
func GoroutineScheduler() Scheduler
func NewScheduler ¶ added in v0.0.1
func NewScheduler() Scheduler
type SliceObserver ¶
SliceObserver is a function that gets called whenever the Observable has something to report. The next argument is the item value that is only valid when the done argument is false. When done is true and the err argument is not nil, then the Observable has terminated with an error. When done is true and the err argument is nil, then the Observable has completed normally.
type Subject ¶
type Subject struct {
Observer
Observable
}
Subject is a combination of an Observer and Observable. Subjects are special because they are the only reactive constructs that support multicasting. The items sent to it through its observer side are multicasted to multiple clients subscribed to its observable side.
The Subject exposes all methods from the embedded Observer and Observable. Use the Observer Next, Error and Complete methods to feed data to it. Use the Observable methods to subscribe to it.
After a subject has been terminated by calling either Error or Complete, it goes into terminated state. All subsequent calls to its observer side will be silently ignored. All subsequent subscriptions to the observable side will be handled according to the specific behavior of the subject. There are different types of subjects, see the different NewXxxSubject functions for more info.
func NewReplaySubject ¶
NewReplaySubject creates a new ReplaySubject. ReplaySubject ensures that all observers see the same sequence of emitted items, even if they subscribe after. When bufferCapacity argument is 0, then DefaultReplayCapacity is used (currently 16380). When windowDuration argument is 0, then entries added to the buffer will remain fresh forever.
func NewSubject ¶
func NewSubject() Subject
NewSubject creates a new Subject. After the subject is terminated, all subsequent subscriptions to the observable side will be terminated immediately with either an Error or Complete notification send to the subscribing client
Note that this implementation is blocking. When there are subscribers, the observable goroutine is blocked until all subscribers have processed the next, error or complete notification.
type Subscriber ¶
type Subscriber interface {
// A Subscriber is also a Subscription.
Subscription
// Add will create and return a new child Subscriber setup in such a way that
// calling Unsubscribe on the parent will also call Unsubscribe on the child.
// Calling the Unsubscribe method on the child will NOT propagate to the
// parent!
Add() Subscriber
// OnUnsubscribe will add the given callback function to the subscriber.
// The callback will be called when either the Unsubscribe of the parent
// or of the subscriber itself is called. If the subscription was already
// canceled, then the callback function will just be called immediately.
OnUnsubscribe(callback func())
// OnWait will register a callback to call when subscription Wait is called.
OnWait(callback func())
// Done will set the error internally and then cancel the subscription by
// calling the Unsubscribe method. A nil value for error indicates success.
Done(err error)
// Error returns the error set by calling the Done(err) method. As long as
// the subscriber is still subscribed Error will return nil.
Error() error
}
Subscriber is a Subscription with management functionality.
type Subscription ¶
type Subscription interface {
// Subscribed returns true when the subscription is currently active.
Subscribed() bool
// Unsubscribe will do nothing if the subscription is not active. If the
// state is still active however, it will be changed to canceled.
// Subsequently, it will call Unsubscribe on all child subscriptions added
// through Add, along with all methods added through OnUnsubscribe. When the
// subscription is canceled by calling Unsubscribe a call to the Wait method
// will return the error ErrUnsubscribed.
Unsubscribe()
// Canceled returns true when the subscription state is canceled.
Canceled() bool
// Wait will by default block the calling goroutine and wait for the
// Unsubscribe method to be called on this subscription.
// However, when OnWait was called with a callback wait function it will
// call that instead. Calling Wait on a subscription that has already been
// canceled will return immediately. If the subscriber was canceled by
// calling Unsubscribe, then the error returned is ErrUnsubscribed.
// If the subscriber was terminated by calling Done, then the error
// returned here is the one passed to Done.
Wait() error
}
Subscription is an interface that allows code to monitor and control a subscription it received.
type TimeInterval ¶
type TimeIntervalObserver ¶
type TimeIntervalObserver func(next TimeInterval, err error, done bool)
TimeIntervalObserver is a function that gets called whenever the Observable has something to report. The next argument is the item value that is only valid when the done argument is false. When done is true and the err argument is not nil, then the Observable has terminated with an error. When done is true and the err argument is nil, then the Observable has completed normally.
type TimeObserver ¶
TimeObserver is a function that gets called whenever the Observable has something to report. The next argument is the item value that is only valid when the done argument is false. When done is true and the err argument is not nil, then the Observable has terminated with an error. When done is true and the err argument is nil, then the Observable has completed normally.
type TimestampObserver ¶
TimestampObserver is a function that gets called whenever the Observable has something to report. The next argument is the item value that is only valid when the done argument is false. When done is true and the err argument is not nil, then the Observable has terminated with an error. When done is true and the err argument is nil, then the Observable has completed normally.
Directories
¶
| Path | Synopsis |
|---|---|
|
Package rx/generic provides Reactive Extensions for Go, a generics library for asynchronous programming with observable streams.
|
Package rx/generic provides Reactive Extensions for Go, a generics library for asynchronous programming with observable streams. |
|
Package test provides tests for the generic rx package.
|
Package test provides tests for the generic rx package. |
|
All
All determines whether all items emitted by an Observable meet some criteria.
|
All determines whether all items emitted by an Observable meet some criteria. |
|
AsObservable
AsObservableBar when called on an Observable source will type assert the interface{} items of the source to bar items.
|
AsObservableBar when called on an Observable source will type assert the interface{} items of the source to bar items. |
|
AsyncSubject
AsyncSubject emits the last value (and only the last value) emitted by the Observable part, and only after that Observable part completes.
|
AsyncSubject emits the last value (and only the last value) emitted by the Observable part, and only after that Observable part completes. |
|
AuditTime
AuditTime waits until the source emits and then starts a timer.
|
AuditTime waits until the source emits and then starts a timer. |
|
AutoConnect
AutoConnect makes a Multicaster behave like an ordinary Observable that automatically connects the multicaster to its source when the specified number of observers have subscribed to it.
|
AutoConnect makes a Multicaster behave like an ordinary Observable that automatically connects the multicaster to its source when the specified number of observers have subscribed to it. |
|
Average
Average calculates the average of numbers emitted by an Observable and emits this average.
|
Average calculates the average of numbers emitted by an Observable and emits this average. |
|
BehaviorSubject
NewBehaviorSubject returns a new BehaviorSubject.
|
NewBehaviorSubject returns a new BehaviorSubject. |
|
Buffer
Buffer buffers the source Observable values until closingNotifier emits.
|
Buffer buffers the source Observable values until closingNotifier emits. |
|
BufferTime
BufferTime buffers the source Observable values for a specific time period and emits those as a slice periodically in time.
|
BufferTime buffers the source Observable values for a specific time period and emits those as a slice periodically in time. |
|
Catch
Catch recovers from an error notification by continuing the sequence without emitting the error but by switching to the catch ObservableInt to provide items.
|
Catch recovers from an error notification by continuing the sequence without emitting the error but by switching to the catch ObservableInt to provide items. |
|
CatchError
CatchError catches errors on the Observable to be handled by returning a new Observable or throwing an error.
|
CatchError catches errors on the Observable to be handled by returning a new Observable or throwing an error. |
|
CombineLatest
CombineLatest will subscribe to all Observables.
|
CombineLatest will subscribe to all Observables. |
|
CombineLatestAll
CombineLatestAll flattens a higher order observable (e.g.
|
CombineLatestAll flattens a higher order observable (e.g. |
|
CombineLatestMap
CombinesLatestMap maps every entry emitted by the Observable into an Observable, and then subscribe to it, until the source observable completes.
|
CombinesLatestMap maps every entry emitted by the Observable into an Observable, and then subscribe to it, until the source observable completes. |
|
CombineLatestMapTo
CombinesLatestMapTo maps every entry emitted by the Observable into a single Observable, and then subscribe to it, until the source observable completes.
|
CombinesLatestMapTo maps every entry emitted by the Observable into a single Observable, and then subscribe to it, until the source observable completes. |
|
CombineLatestWith
CombineLatestWith will subscribe to its Observable and all other Observables passed in.
|
CombineLatestWith will subscribe to its Observable and all other Observables passed in. |
|
Concat
Concat emits the emissions from two or more observables without interleaving them.
|
Concat emits the emissions from two or more observables without interleaving them. |
|
ConcatAll
ConcatAll flattens a higher order observable by concattenating the observables it emits.
|
ConcatAll flattens a higher order observable by concattenating the observables it emits. |
|
ConcatMap
ConcatMap transforms the items emitted by an Observable by applying a function to each item and returning an Observable.
|
ConcatMap transforms the items emitted by an Observable by applying a function to each item and returning an Observable. |
|
ConcatMapTo
ConcatMapTo maps every entry emitted by the Observable into a single Observable.
|
ConcatMapTo maps every entry emitted by the Observable into a single Observable. |
|
ConcatWith
ConcatWith emits the emissions from two or more observables without interleaving them.
|
ConcatWith emits the emissions from two or more observables without interleaving them. |
|
Connect
Connect instructs a connectable Observable to begin emitting items to its subscribers.
|
Connect instructs a connectable Observable to begin emitting items to its subscribers. |
|
Count
Count counts the number of items emitted by the source ObservableInt and emits only this value.
|
Count counts the number of items emitted by the source ObservableInt and emits only this value. |
|
Create
Create provides a way of creating an Observable from scratch by calling observer methods programmatically.
|
Create provides a way of creating an Observable from scratch by calling observer methods programmatically. |
|
CreateFutureRecursive
CreateFutureRecursive provides a way of creating an Observable from scratch by calling observer methods programmatically.
|
CreateFutureRecursive provides a way of creating an Observable from scratch by calling observer methods programmatically. |
|
CreateRecursive
CreateRecursive provides a way of creating an Observable from scratch by calling observer methods programmatically.
|
CreateRecursive provides a way of creating an Observable from scratch by calling observer methods programmatically. |
|
DebounceTime
DebounceTime only emits the last item of a burst from an Observable if a particular timespan has passed without it emitting another item.
|
DebounceTime only emits the last item of a burst from an Observable if a particular timespan has passed without it emitting another item. |
|
Defer
Defer does not create the Observable until the observer subscribes.
|
Defer does not create the Observable until the observer subscribes. |
|
Delay
Delay shifts the emission from an Observable forward in time by a particular amount of time.
|
Delay shifts the emission from an Observable forward in time by a particular amount of time. |
|
Distinct
Distinct suppress duplicate items emitted by an Observable.
|
Distinct suppress duplicate items emitted by an Observable. |
|
DistinctUntilChanged
DistinctUntilChanged only emits when the current value is different from the last.
|
DistinctUntilChanged only emits when the current value is different from the last. |
|
Do
Do calls a function for each next value passing through the observable.
|
Do calls a function for each next value passing through the observable. |
|
DoOnComplete
DoOnComplete calls a function when the stream completes.
|
DoOnComplete calls a function when the stream completes. |
|
DoOnError
DoOnError calls a function for any error on the stream.
|
DoOnError calls a function for any error on the stream. |
|
ElementAt
ElementAt emit only item n emitted by an Observable.
|
ElementAt emit only item n emitted by an Observable. |
|
Empty
Empty creates an Observable that emits no items but terminates normally.
|
Empty creates an Observable that emits no items but terminates normally. |
|
Filter
Filter emits only those items from an observable that pass a predicate test.
|
Filter emits only those items from an observable that pass a predicate test. |
|
Finally
Finally applies a function for any error or completion on the stream.
|
Finally applies a function for any error or completion on the stream. |
|
First
First emits only the first item, or the first item that meets a condition, from an Observable.
|
First emits only the first item, or the first item that meets a condition, from an Observable. |
|
From
From creates an observable from multiple values passed in.
|
From creates an observable from multiple values passed in. |
|
FromChan
FromChan creates an Observable from a Go channel.
|
FromChan creates an Observable from a Go channel. |
|
IgnoreCompletion
IgnoreCompletion only emits items and never completes, neither with Error nor with Complete.
|
IgnoreCompletion only emits items and never completes, neither with Error nor with Complete. |
|
IgnoreElements
IgnoreElements does not emit any items from an Observable but mirrors its termination notification.
|
IgnoreElements does not emit any items from an Observable but mirrors its termination notification. |
|
Interval
Interval creates an ObservableInt that emits a sequence of integers spaced by a particular time interval.
|
Interval creates an ObservableInt that emits a sequence of integers spaced by a particular time interval. |
|
Just
Just creates an observable that emits a particular item.
|
Just creates an observable that emits a particular item. |
|
Last
Last emits only the last item emitted by an Observable.
|
Last emits only the last item emitted by an Observable. |
|
Map
Map transforms the items emitted by an Observable by applying a function to each item.
|
Map transforms the items emitted by an Observable by applying a function to each item. |
|
MapTo
MapTo transforms the items emitted by an Observable.
|
MapTo transforms the items emitted by an Observable. |
|
Max
Max determines, and emits, the maximum-valued item emitted by an Observable.
|
Max determines, and emits, the maximum-valued item emitted by an Observable. |
|
Merge
Merge combines multiple Observables into one by merging their emissions.
|
Merge combines multiple Observables into one by merging their emissions. |
|
MergeAll
MergeAll flattens a higher order observable by merging the observables it emits.
|
MergeAll flattens a higher order observable by merging the observables it emits. |
|
MergeDelayError
MergeDelayError combines multiple Observables into one by merging their emissions.
|
MergeDelayError combines multiple Observables into one by merging their emissions. |
|
MergeDelayErrorWith
MergeDelayErrorWith combines multiple Observables into one by merging their emissions.
|
MergeDelayErrorWith combines multiple Observables into one by merging their emissions. |
|
MergeMap
MergeMap transforms the items emitted by an Observable by applying a function to each item an returning an Observable.
|
MergeMap transforms the items emitted by an Observable by applying a function to each item an returning an Observable. |
|
MergeMapTo
MergeMapTo maps every entry emitted by the Observable into a single Observable.
|
MergeMapTo maps every entry emitted by the Observable into a single Observable. |
|
MergeWith
MergeWith combines multiple Observables into one by merging their emissions.
|
MergeWith combines multiple Observables into one by merging their emissions. |
|
Min
Min determines, and emits, the minimum-valued item emitted by an Observable.
|
Min determines, and emits, the minimum-valued item emitted by an Observable. |
|
Never
Never creates an Observable that emits no items and does't terminate.
|
Never creates an Observable that emits no items and does't terminate. |
|
Observable
Observable is essentially a subscribe function taking an observe function, scheduler and an subscriber.
|
Observable is essentially a subscribe function taking an observe function, scheduler and an subscriber. |
|
ObserveOn
ObserveOn specifies a schedule function to use for delivering values to the observer.
|
ObserveOn specifies a schedule function to use for delivering values to the observer. |
|
ObserverObservable
ObserverObservable actually is an observer that is made observable.
|
ObserverObservable actually is an observer that is made observable. |
|
Of
Of emits a variable amount of values in a sequence and then emits a complete notification.
|
Of emits a variable amount of values in a sequence and then emits a complete notification. |
|
Only
Only filters the value stream of an observable and lets only the values of a specific type pass.
|
Only filters the value stream of an observable and lets only the values of a specific type pass. |
|
Passthrough
Passthrough just passes through all output from the Observable.
|
Passthrough just passes through all output from the Observable. |
|
Println
Println subscribes to the Observable and prints every item to os.Stdout while it waits for completion or error.
|
Println subscribes to the Observable and prints every item to os.Stdout while it waits for completion or error. |
|
Publish
Publish returns a Multicaster for a Subject to an underlying Observable and turns the subject into a connnectable observable.
|
Publish returns a Multicaster for a Subject to an underlying Observable and turns the subject into a connnectable observable. |
|
PublishBehavior
PublishBehavior returns a Multicaster that shares a single subscription to the underlying Observable returning an initial value or the last value emitted by the underlying Observable.
|
PublishBehavior returns a Multicaster that shares a single subscription to the underlying Observable returning an initial value or the last value emitted by the underlying Observable. |
|
PublishLast
PublishLast returns a Multicaster that shares a single subscription to the underlying Observable containing only the last value emitted before it completes.
|
PublishLast returns a Multicaster that shares a single subscription to the underlying Observable containing only the last value emitted before it completes. |
|
PublishReplay
PublishReplay returns a Multicaster for a ReplaySubject to an underlying Observable and turns the subject into a connectable observable.
|
PublishReplay returns a Multicaster for a ReplaySubject to an underlying Observable and turns the subject into a connectable observable. |
|
Range
Range creates an Observable that emits a range of sequential int values.
|
Range creates an Observable that emits a range of sequential int values. |
|
Reduce
Reduce applies a reducer function to each item emitted by an Observable and the previous reducer result.
|
Reduce applies a reducer function to each item emitted by an Observable and the previous reducer result. |
|
RefCount
RefCount makes a Connectable behave like an ordinary Observable.
|
RefCount makes a Connectable behave like an ordinary Observable. |
|
Repeat
Repeat creates an observable that emits a sequence of items repeatedly.
|
Repeat creates an observable that emits a sequence of items repeatedly. |
|
ReplaySubject
ReplaySubject ensures that all observers see the same sequence of emitted items, even if they subscribe after.
|
ReplaySubject ensures that all observers see the same sequence of emitted items, even if they subscribe after. |
|
Retry
Retry if a source Observable sends an error notification, resubscribe to it in the hopes that it will complete without error.
|
Retry if a source Observable sends an error notification, resubscribe to it in the hopes that it will complete without error. |
|
SampleTime
SampleTime emits the most recent item emitted by an Observable within periodic time intervals.
|
SampleTime emits the most recent item emitted by an Observable within periodic time intervals. |
|
Scan
Scan applies a accumulator function to each item emitted by an Observable and the previous accumulator result.
|
Scan applies a accumulator function to each item emitted by an Observable and the previous accumulator result. |
|
Serialize
Serialize forces an observable to make serialized calls and to be well-behaved.
|
Serialize forces an observable to make serialized calls and to be well-behaved. |
|
Single
Single enforces that the observable sends exactly one data item and then completes.
|
Single enforces that the observable sends exactly one data item and then completes. |
|
Skip
Skip suppresses the first n items emitted by an Observable.
|
Skip suppresses the first n items emitted by an Observable. |
|
SkipLast
SkipLast suppresses the last n items emitted by an Observable.
|
SkipLast suppresses the last n items emitted by an Observable. |
|
Start
Start creates an Observable that emits the return value of a function.
|
Start creates an Observable that emits the return value of a function. |
|
StartWith
StartWith returns an observable that, at the moment of subscription, will synchronously emit all values provided to this operator, then subscribe to the source and mirror all of its emissions to subscribers.
|
StartWith returns an observable that, at the moment of subscription, will synchronously emit all values provided to this operator, then subscribe to the source and mirror all of its emissions to subscribers. |
|
Subject
Subject is a combination of an observer and observable.
|
Subject is a combination of an observer and observable. |
|
Subscribe
Subscribe operates upon the emissions and notifications from an Observable.
|
Subscribe operates upon the emissions and notifications from an Observable. |
|
SubscribeOn
SubscribeOn specifies the scheduler an Observable should use when it is subscribed to.
|
SubscribeOn specifies the scheduler an Observable should use when it is subscribed to. |
|
Sum
Sum calculates the sum of numbers emitted by an Observable and emits this sum.
|
Sum calculates the sum of numbers emitted by an Observable and emits this sum. |
|
SwitchAll
SwitchAll converts an Observable that emits Observables into a single Observable that emits the items emitted by the most-recently-emitted of those Observables.
|
SwitchAll converts an Observable that emits Observables into a single Observable that emits the items emitted by the most-recently-emitted of those Observables. |
|
SwitchMap
SwitchMap transforms the items emitted by an Observable by applying a function to each item an returning an Observable.
|
SwitchMap transforms the items emitted by an Observable by applying a function to each item an returning an Observable. |
|
Take
Take emits only the first n items emitted by an Observable.
|
Take emits only the first n items emitted by an Observable. |
|
TakeLast
TakeLast emits only the last n items emitted by an Observable.
|
TakeLast emits only the last n items emitted by an Observable. |
|
TakeUntil
TakeUntil emits items emitted by an Observable until another Observable emits an item.
|
TakeUntil emits items emitted by an Observable until another Observable emits an item. |
|
TakeWhile
TakeWhile mirrors items emitted by an Observable until a specified condition becomes false.
|
TakeWhile mirrors items emitted by an Observable until a specified condition becomes false. |
|
ThrottleTime
ThrottleTime emits when the source emits and then starts a timer during which all emissions from the source are ignored.
|
ThrottleTime emits when the source emits and then starts a timer during which all emissions from the source are ignored. |
|
Throw
Throw creates an observable that emits no items and terminates with an error.
|
Throw creates an observable that emits no items and terminates with an error. |
|
Ticker
Ticker creates an ObservableTime that emits a sequence of timestamps after an initialDelay has passed.
|
Ticker creates an ObservableTime that emits a sequence of timestamps after an initialDelay has passed. |
|
TimeInterval
TimeInterval intercepts the items from the source Observable and emits in their place a struct that indicates the amount of time that elapsed between pairs of emissions.
|
TimeInterval intercepts the items from the source Observable and emits in their place a struct that indicates the amount of time that elapsed between pairs of emissions. |
|
Timeout
Timeout mirrors the source Observable, but issue an error notification if a particular period of time elapses without any emitted items.
|
Timeout mirrors the source Observable, but issue an error notification if a particular period of time elapses without any emitted items. |
|
Timer
Timer creates an Observable that emits a sequence of integers (starting at zero) after an initialDelay has passed.
|
Timer creates an Observable that emits a sequence of integers (starting at zero) after an initialDelay has passed. |
|
Timestamp
Timestamp attaches a timestamp to each item emitted by an observable indicating when it was emitted.
|
Timestamp attaches a timestamp to each item emitted by an observable indicating when it was emitted. |
|
ToChan
ToChan returns a channel that emits interface{} values.
|
ToChan returns a channel that emits interface{} values. |
|
ToSingle
ToSingle blocks until the Observable emits exactly one value or an error.
|
ToSingle blocks until the Observable emits exactly one value or an error. |
|
ToSlice
ToSlice collects all values from the Observable into an slice.
|
ToSlice collects all values from the Observable into an slice. |
|
Wait
Wait subscribes to the Observable and waits for completion or error.
|
Wait subscribes to the Observable and waits for completion or error. |
|
WithLatestFrom
WithLatestFrom will subscribe to all Observables and wait for all of them to emit before emitting the first slice.
|
WithLatestFrom will subscribe to all Observables and wait for all of them to emit before emitting the first slice. |
|
WithLatestFromAll
WithLatestFromAll flattens a higher order observable (e.g.
|
WithLatestFromAll flattens a higher order observable (e.g. |