Documentation
¶
Overview ¶
Package rx provides Reactive Extensions, a powerful API for asynchronous programming in Go, built around observables and operators to process streams of data seamlessly.
Example (All) ¶
package main
import (
"fmt"
"github.com/reactivego/rx"
)
func main() {
source := rx.From("ZERO", "ONE", "TWO")
for k, v := range source.All() {
fmt.Println(k, v)
}
fmt.Println("OK")
}
Output: 0 ZERO 1 ONE 2 TWO OK
Example (BufferCount) ¶
package main
import (
"fmt"
"github.com/reactivego/rx"
)
func main() {
source := rx.From(0, 1, 2, 3)
fmt.Println("BufferCount(From(0, 1, 2, 3), 2, 1)")
rx.BufferCount(source, 2, 1).Println().Wait()
fmt.Println("BufferCount(From(0, 1, 2, 3), 2, 2)")
rx.BufferCount(source, 2, 2).Println().Wait()
fmt.Println("BufferCount(From(0, 1, 2, 3), 2, 3)")
rx.BufferCount(source, 2, 3).Println().Wait()
fmt.Println("BufferCount(From(0, 1, 2, 3), 3, 2)")
rx.BufferCount(source, 3, 2).Println().Wait()
fmt.Println("BufferCount(From(0, 1, 2, 3), 6, 6)")
rx.BufferCount(source, 6, 6).Println().Wait()
fmt.Println("BufferCount(From(0, 1, 2, 3), 2, 0)")
rx.BufferCount(source, 2, 0).Println().Wait()
}
Output: BufferCount(From(0, 1, 2, 3), 2, 1) [0 1] [1 2] [2 3] [3] BufferCount(From(0, 1, 2, 3), 2, 2) [0 1] [2 3] BufferCount(From(0, 1, 2, 3), 2, 3) [0 1] [3] BufferCount(From(0, 1, 2, 3), 3, 2) [0 1 2] [2 3] BufferCount(From(0, 1, 2, 3), 6, 6) [0 1 2 3] BufferCount(From(0, 1, 2, 3), 2, 0) [0 1]
Example (ConcatAll) ¶
package main
import (
"fmt"
"time"
"github.com/reactivego/rx"
)
func main() {
source := rx.Empty[rx.Observable[string]]()
rx.ConcatAll(source).Wait()
source = rx.Of(rx.Empty[string]())
rx.ConcatAll(source).Wait()
req := func(request string, duration time.Duration) rx.Observable[string] {
req := rx.From(request + " response")
if duration == 0 {
return req
}
return req.Delay(duration)
}
const ms = time.Millisecond
req1 := req("first", 10*ms)
req2 := req("second", 20*ms)
req3 := req("third", 0*ms)
req4 := req("fourth", 60*ms)
source = rx.From(req1).ConcatWith(rx.From(req2, req3, req4).Delay(100 * ms))
rx.ConcatAll(source).Println().Wait()
fmt.Println("OK")
}
Output: first response second response third response fourth response OK
Example (Count) ¶
package main
import (
"fmt"
"github.com/reactivego/rx"
)
func main() {
source := rx.From(1, 2, 3, 4, 5)
count := source.Count()
count.Println().Wait()
emptySource := rx.Empty[int]()
emptyCount := emptySource.Count()
emptyCount.Println().Wait()
fmt.Println("OK")
}
Output: 5 0 OK
Example (ElementAt) ¶
package main
import (
"github.com/reactivego/rx"
)
func main() {
rx.From(0, 1, 2, 3, 4).ElementAt(2).Println().Wait()
}
Output: 2
Example (ExhaustAll) ¶
package main
import (
"fmt"
"strconv"
"time"
"github.com/reactivego/rx"
)
func main() {
const ms = time.Millisecond
stream := func(name string, duration time.Duration, count int) rx.Observable[string] {
return rx.Map(rx.Timer[int](0*ms, duration), func(next int) string {
return name + "-" + strconv.Itoa(next)
}).Take(count)
}
streams := []rx.Observable[string]{
stream("a", 20*ms, 3),
stream("b", 20*ms, 3),
stream("c", 20*ms, 3),
rx.Empty[string](),
}
streamofstreams := rx.Map(rx.Timer[int](20*ms, 30*ms, 250*ms, 100*ms).Take(4), func(next int) rx.Observable[string] {
return streams[next]
})
err := rx.ExhaustAll(streamofstreams).Println().Wait()
if err == nil {
fmt.Println("success")
}
}
Output: a-0 a-1 a-2 c-0 c-1 c-2 success
Example (Marshal) ¶
package main
import (
"encoding/json"
"github.com/reactivego/rx"
)
func main() {
type R struct {
A string `json:"a"`
B string `json:"b"`
}
b2s := func(data []byte) string { return string(data) }
rx.Map(rx.Of(R{"Hello", "World"}).Marshal(json.Marshal), b2s).Println().Wait()
}
Output: {"a":"Hello","b":"World"}
Example (Multicast) ¶
package main
import (
"fmt"
"github.com/reactivego/rx"
)
func main() {
serial := rx.NewScheduler()
in, out := rx.Multicast[int](1)
// Ignore everything before any subscriptions, including the last!
in.Next(-2)
in.Next(-1)
in.Next(0)
in.Next(1)
// Schedule the subsequent emits in a loop. This will be the first task to
// run on the serial scheduler after the subscriptions have been added.
serial.ScheduleLoop(2, func(index int, again func(next int)) {
if index < 4 {
in.Next(index)
again(index + 1)
} else {
in.Done(rx.Error("foo"))
}
})
// Add a couple of subscriptions
sub1 := out.Println().Go(serial)
sub2 := out.Println().Go(serial)
// Let the scheduler run and wait for all of its scheduled tasks to finish.
serial.Wait()
fmt.Println(sub1.Wait())
fmt.Println(sub2.Wait())
}
Output: 2 2 3 3 foo foo
Example (MulticastDrop) ¶
package main
import (
"fmt"
"github.com/reactivego/rx"
)
func main() {
serial := rx.NewScheduler()
const onBackpressureDrop = -1
// multicast with backpressure handling set to dropping incoming
// items that don't fit in the buffer once it has filled up.
in, out := rx.Multicast[int](1 * onBackpressureDrop)
// ignore everything before any subscriptions, including the last!
in.Next(-2)
in.Next(-1)
in.Next(0)
in.Next(1)
// add a couple of subscriptions
sub1 := out.Println().Go(serial)
sub2 := out.Println().Go(serial)
in.Next(2) // accepted: buffer not full
in.Next(3) // dropped: buffer full
in.Done(rx.Error("foo")) // dropped: buffer full
serial.Wait()
fmt.Println(sub1.Wait())
fmt.Println(sub2.Wait())
}
Output: 2 2 <nil> <nil>
Example (Race) ¶
package main
import (
"fmt"
"time"
"github.com/reactivego/rx"
)
func main() {
const ms = time.Millisecond
req := func(request string, duration time.Duration) rx.Observable[string] {
return rx.From(request + " response").Delay(duration)
}
req1 := req("first", 50*ms)
req2 := req("second", 10*ms)
req3 := req("third", 60*ms)
rx.Race(req1, req2, req3).Println().Wait()
err := func(text string, duration time.Duration) rx.Observable[int] {
return rx.Throw[int](rx.Error(text + " error")).Delay(duration)
}
err1 := err("first", 10*ms)
err2 := err("second", 20*ms)
err3 := err("third", 30*ms)
fmt.Println(rx.Race(err1, err2, err3).Wait(rx.Goroutine))
}
Output: second response first error
Example (Retry) ¶
package main
import (
"fmt"
"github.com/reactivego/rx"
)
func main() {
var first error = rx.Error("error")
a := rx.Create(func(index int) (next int, err error, done bool) {
if index < 3 {
return index, nil, false
}
err, first = first, nil
return 0, err, true
})
err := a.Retry().Println().Wait()
fmt.Println(first == nil)
fmt.Println(err)
}
Output: 0 1 2 0 1 2 true <nil>
Example (Skip) ¶
package main
import (
"github.com/reactivego/rx"
)
func main() {
rx.From(1, 2, 3, 4, 5).Skip(2).Println().Wait()
}
Output: 3 4 5
Example (Subject) ¶
package main
import (
"fmt"
"github.com/reactivego/rx"
)
func main() {
serial := rx.NewScheduler()
// subject collects emits when there are no subscriptions active.
in, out := rx.Subject[int](0, 1)
// ignore everything before any subscriptions, except the last because buffer size is 1
in.Next(-2)
in.Next(-1)
in.Next(0)
in.Next(1)
// add a couple of subscriptions
sub1 := out.Println().Go(serial)
sub2 := out.Println().Go(serial)
// schedule the subsequent emits on the serial scheduler otherwise these calls
// will block because the buffer is full.
// subject will detect usage of scheduler on observable side and use it on the
// observer side to keep the data flow through the subject going.
serial.Schedule(func() {
in.Next(2)
in.Next(3)
in.Done(rx.Error("foo"))
})
serial.Wait()
fmt.Println(sub1.Wait())
fmt.Println(sub2.Wait())
}
Output: 1 1 2 2 3 3 foo foo
Example (SwitchAll) ¶
package main
import (
"fmt"
"time"
"github.com/reactivego/rx"
)
func main() {
const ms = time.Millisecond
interval42x4 := rx.Interval[int](42 * ms).Take(4)
interval16x4 := rx.Interval[int](16 * ms).Take(4)
err := rx.SwitchAll(rx.Map(interval42x4, func(next int) rx.Observable[int] { return interval16x4 })).Println().Wait(rx.Goroutine)
if err == nil {
fmt.Println("success")
}
}
Output: 0 1 0 1 0 1 0 1 2 3 success
Example (SwitchMap) ¶
package main
import (
"fmt"
"time"
"github.com/reactivego/rx"
)
func main() {
const ms = time.Millisecond
webreq := func(request string, duration time.Duration) rx.Observable[string] {
return rx.From(request + " result").Delay(duration)
}
first := webreq("first", 50*ms)
second := webreq("second", 10*ms)
latest := webreq("latest", 50*ms)
switchmap := rx.SwitchMap(rx.Interval[int](20*ms).Take(3), func(i int) rx.Observable[string] {
switch i {
case 0:
return first
case 1:
return second
case 2:
return latest
default:
return rx.Empty[string]()
}
})
err := switchmap.Println().Wait()
if err == nil {
fmt.Println("success")
}
}
Output: second result latest result success
Example (Values) ¶
package main
import (
"fmt"
"github.com/reactivego/rx"
"github.com/reactivego/scheduler"
)
func main() {
source := rx.From(1, 3, 5)
// Why choose the Goroutine concurrent scheduler?
// An observable can actually be at the root of a tree
// of separately running observables that have their
// responses merged. The Goroutine scheduler allows
// these observables to run concurrently.
// run the observable on 1 or more goroutines
for i := range source.Values(scheduler.Goroutine) {
// This is called from a newly created goroutine
fmt.Println(i)
}
// run the observable on the current goroutine
for i := range source.Values(scheduler.New()) {
fmt.Println(i)
}
fmt.Println("OK")
}
Output: 1 3 5 1 3 5 OK
Index ¶
- Constants
- Variables
- func All2[T, U any](observable Observable[Tuple2[T, U]], scheduler ...Scheduler) iter.Seq2[T, U]
- func Equal[T comparable]() func(T, T) bool
- func Multicast[T any](size int) (Observer[T], Observable[T])
- func Must[T any](t T, err error) T
- func Subject[T any](age time.Duration, capacity ...int) (Observer[T], Observable[T])
- type ConcurrentScheduler
- type Connectable
- type Connector
- type Creator
- type Error
- type Float
- type Integer
- type MaxBufferSizeOption
- type Observable
- func AsObservable[T any](observable Observable[any]) Observable[T]
- func BufferCount[T any](observable Observable[T], bufferSize, startBufferEvery int) Observable[[]T]
- func CombineAll[T any](observable Observable[Observable[T]]) Observable[[]T]
- func CombineLatest[T any](observables ...Observable[T]) Observable[[]T]
- func CombineLatest2[T, U any](first Observable[T], second Observable[U]) Observable[Tuple2[T, U]]
- func CombineLatest3[T, U, V any](first Observable[T], second Observable[U], third Observable[V]) Observable[Tuple3[T, U, V]]
- func CombineLatest4[T, U, V, W any](first Observable[T], second Observable[U], third Observable[V], ...) Observable[Tuple4[T, U, V, W]]
- func CombineLatest5[T, U, V, W, X any](first Observable[T], second Observable[U], third Observable[V], ...) Observable[Tuple5[T, U, V, W, X]]
- func Concat[T any](observables ...Observable[T]) Observable[T]
- func ConcatAll[T any](observable Observable[Observable[T]]) Observable[T]
- func ConcatMap[T, U any](observable Observable[T], project func(T) Observable[U]) Observable[U]
- func Create[T any](create Creator[T]) Observable[T]
- func Defer[T any](factory func() Observable[T]) Observable[T]
- func Empty[T any]() Observable[T]
- func ExhaustAll[T any](observable Observable[Observable[T]]) Observable[T]
- func ExhaustMap[T, U any](observable Observable[T], project func(T) Observable[U]) Observable[U]
- func From[T any](slice ...T) Observable[T]
- func Interval[T Integer | Float](interval time.Duration) Observable[T]
- func Map[T, U any](observable Observable[T], project func(T) U) Observable[U]
- func MapE[T, U any](observable Observable[T], project func(T) (U, error)) Observable[U]
- func Merge[T any](observables ...Observable[T]) Observable[T]
- func MergeAll[T any](observable Observable[Observable[T]]) Observable[T]
- func MergeMap[T, U any](observable Observable[T], project func(T) Observable[U]) Observable[U]
- func Never[T any]() Observable[T]
- func Of[T any](value T) Observable[T]
- func Pull[T any](seq iter.Seq[T]) Observable[T]
- func Pull2[T, U any](seq iter.Seq2[T, U]) Observable[Tuple2[T, U]]
- func Race[T any](observables ...Observable[T]) Observable[T]
- func Recv[T any](ch <-chan T) Observable[T]
- func Reduce[T, U any](observable Observable[T], seed U, accumulator func(acc U, next T) U) Observable[U]
- func ReduceE[T, U any](observable Observable[T], seed U, accumulator func(acc U, next T) (U, error)) Observable[U]
- func Scan[T, U any](observable Observable[T], seed U, accumulator func(acc U, next T) U) Observable[U]
- func ScanE[T, U any](observable Observable[T], seed U, accumulator func(acc U, next T) (U, error)) Observable[U]
- func SwitchAll[T any](observable Observable[Observable[T]]) Observable[T]
- func SwitchMap[T, U any](o Observable[T], project func(T) Observable[U]) Observable[U]
- func Throw[T any](err error) Observable[T]
- func Ticker(initialDelay time.Duration, intervals ...time.Duration) Observable[time.Time]
- func Timer[T Integer | Float](initialDelay time.Duration, intervals ...time.Duration) Observable[T]
- func WithLatestFrom[T any](observables ...Observable[T]) Observable[[]T]
- func WithLatestFrom2[T, U any](first Observable[T], second Observable[U]) Observable[Tuple2[T, U]]
- func WithLatestFrom3[T, U, V any](first Observable[T], second Observable[U], third Observable[V]) Observable[Tuple3[T, U, V]]
- func WithLatestFrom4[T, U, V, W any](first Observable[T], second Observable[U], third Observable[V], ...) Observable[Tuple4[T, U, V, W]]
- func WithLatestFrom5[T, U, V, W, X any](first Observable[T], second Observable[U], third Observable[V], ...) Observable[Tuple5[T, U, V, W, X]]
- func WithLatestFromAll[T any](observable Observable[Observable[T]]) Observable[[]T]
- func Zip[T any](observables ...Observable[T]) Observable[[]T]
- func Zip2[T, U any](first Observable[T], second Observable[U], options ...MaxBufferSizeOption) Observable[Tuple2[T, U]]
- func Zip3[T, U, V any](first Observable[T], second Observable[U], third Observable[V], ...) Observable[Tuple3[T, U, V]]
- func Zip4[T, U, V, W any](first Observable[T], second Observable[U], third Observable[V], ...) Observable[Tuple4[T, U, V, W]]
- func Zip5[T, U, V, W, X any](first Observable[T], second Observable[U], third Observable[V], ...) Observable[Tuple5[T, U, V, W, X]]
- func ZipAll[T any](observable Observable[Observable[T]], options ...MaxBufferSizeOption) Observable[[]T]
- func (observable Observable[T]) All(scheduler ...Scheduler) iter.Seq2[int, T]
- func (observable Observable[T]) Append(slice *[]T) Observable[T]
- func (observable Observable[T]) AsObservable() Observable[any]
- func (observable Observable[T]) Assign(value *T) Observable[T]
- func (observable Observable[T]) AutoUnsubscribe() Observable[T]
- func (observable Observable[T]) Catch(other Observable[T]) Observable[T]
- func (observable Observable[T]) CatchError(selector func(err error, caught Observable[T]) Observable[T]) Observable[T]
- func (observable Observable[T]) ConcatWith(others ...Observable[T]) Observable[T]
- func (observable Observable[T]) Count() Observable[int]
- func (observable Observable[T]) Delay(duration time.Duration) Observable[T]
- func (observable Observable[T]) DistinctUntilChanged(equal func(T, T) bool) Observable[T]
- func (observable Observable[T]) Do(f func(T)) Observable[T]
- func (observable Observable[T]) ElementAt(n int) Observable[T]
- func (observable Observable[T]) EndWith(values ...T) Observable[T]
- func (observable Observable[T]) Filter(predicate func(T) bool) Observable[T]
- func (observable Observable[T]) First(schedulers ...Scheduler) (value T, err error)
- func (observable Observable[T]) Fprint(out io.Writer) Observable[T]
- func (observable Observable[T]) Fprintf(out io.Writer, format string) Observable[T]
- func (observable Observable[T]) Fprintln(out io.Writer) Observable[T]
- func (observable Observable[T]) Go(schedulers ...Scheduler) Subscription
- func (observable Observable[T]) Last(schedulers ...Scheduler) (value T, err error)
- func (observable Observable[T]) Map(project func(T) any) Observable[any]
- func (observable Observable[T]) MapE(project func(T) (any, error)) Observable[any]
- func (observable Observable[T]) Marshal(marshal func(any) ([]byte, error)) Observable[[]byte]
- func (observable Observable[T]) MergeWith(others ...Observable[T]) Observable[T]
- func (observable Observable[T]) OnComplete(f func()) Observable[T]
- func (observable Observable[T]) OnDone(f func(error)) Observable[T]
- func (observable Observable[T]) OnError(f func(error)) Observable[T]
- func (observable Observable[T]) OnNext(f func(T)) Observable[T]
- func (observable Observable[T]) Passthrough() Observable[T]
- func (observable Observable[T]) Pipe(segments ...Pipe[T]) Observable[T]
- func (observable Observable[T]) Print() Observable[T]
- func (observable Observable[T]) Printf(format string) Observable[T]
- func (observable Observable[T]) Println() Observable[T]
- func (observable Observable[T]) Publish() Connectable[T]
- func (observable Observable[T]) RaceWith(others ...Observable[T]) Observable[T]
- func (observable Observable[T]) Repeat(count ...int) Observable[T]
- func (observable Observable[T]) Retry(limit ...int) Observable[T]
- func (observable Observable[T]) RetryTime(backoff func(int) time.Duration, limit ...int) Observable[T]
- func (observable Observable[T]) SampleTime(window time.Duration) Observable[T]
- func (observable Observable[T]) Send(ch chan<- T) Observable[T]
- func (observable Observable[T]) Share() Observable[T]
- func (observable Observable[T]) Skip(n int) Observable[T]
- func (observable Observable[T]) Slice(schedulers ...Scheduler) (slice []T, err error)
- func (observable Observable[T]) StartWith(values ...T) Observable[T]
- func (observable Observable[T]) Subscribe(observe Observer[T], scheduler Scheduler) Subscription
- func (observable Observable[T]) SubscribeOn(scheduler ConcurrentScheduler) Observable[T]
- func (observable Observable[T]) Take(n int) Observable[T]
- func (observable Observable[T]) TakeWhile(condition func(T) bool) Observable[T]
- func (observable Observable[T]) Tap(tap Observer[T]) Observable[T]
- func (observable Observable[T]) Values(scheduler ...Scheduler) iter.Seq[T]
- func (observable Observable[T]) Wait(schedulers ...Scheduler) error
- type Observer
- type Pipe
- func Append[T any](slice *[]T) Pipe[T]
- func Assign[T any](value *T) Pipe[T]
- func AutoUnsubscribe[T any]() Pipe[T]
- func Catch[T any](other Observable[T]) Pipe[T]
- func CatchError[T any](selector func(err error, caught Observable[T]) Observable[T]) Pipe[T]
- func ConcatWith[T any](others ...Observable[T]) Pipe[T]
- func Delay[T any](duration time.Duration) Pipe[T]
- func DistinctUntilChanged[T any](equal func(T, T) bool) Pipe[T]
- func Do[T any](do func(T)) Pipe[T]
- func ElementAt[T any](n int) Pipe[T]
- func EndWith[T any](values ...T) Pipe[T]
- func Filter[T any](predicate func(T) bool) Pipe[T]
- func Fprint[T any](out io.Writer) Pipe[T]
- func Fprintf[T any](out io.Writer, format string) Pipe[T]
- func Fprintln[T any](out io.Writer) Pipe[T]
- func MergeWith[T any](others ...Observable[T]) Pipe[T]
- func OnComplete[T any](onComplete func()) Pipe[T]
- func OnDone[T any](onDone func(error)) Pipe[T]
- func OnError[T any](onError func(error)) Pipe[T]
- func OnNext[T any](onNext func(T)) Pipe[T]
- func Passthrough[T any]() Pipe[T]
- func Print[T any]() Pipe[T]
- func Printf[T any](format string) Pipe[T]
- func Println[T any]() Pipe[T]
- func RaceWith[T any](others ...Observable[T]) Pipe[T]
- func Repeat[T any](count ...int) Pipe[T]
- func Retry[T any](limit ...int) Pipe[T]
- func Send[T any](ch chan<- T) Pipe[T]
- func Skip[T any](n int) Pipe[T]
- func StartWith[T any](values ...T) Pipe[T]
- func Take[T any](n int) Pipe[T]
- func TakeWhile[T any](condition func(T) bool) Pipe[T]
- func Tap[T any](tap Observer[T]) Pipe[T]
- type Scheduler
- type Signed
- type Subscriber
- type Subscription
- type Tuple2
- type Tuple3
- type Tuple4
- type Tuple5
- type Unsigned
Examples ¶
- Package (All)
- Package (BufferCount)
- Package (ConcatAll)
- Package (Count)
- Package (ElementAt)
- Package (ExhaustAll)
- Package (Marshal)
- Package (Multicast)
- Package (MulticastDrop)
- Package (Race)
- Package (Retry)
- Package (Share)
- Package (Skip)
- Package (Subject)
- Package (SwitchAll)
- Package (SwitchMap)
- Package (Values)
Constants ¶
const InvalidCount = Error("invalid count")
const OutOfSubjectSubscriptions = Error("out of subject subscriptions")
const RepeatCountInvalid = Error("repeat count invalid")
const SubscriptionActive = Error("subscription active")
SubscriptionActive is the error returned by Err() when the subscription is still active and has not yet completed or been canceled.
const SubscriptionCanceled = Error("subscription canceled")
SubscriptionCanceled is the error returned by Wait() and Err() when the subscription was canceled by calling Unsubscribe() on the Subscription. This indicates the subscription was terminated by the subscriber rather than by the observable completing normally or with an error.
const TypecastFailed = Error("typecast failed")
const ZipBufferOverflow = Error("zip buffer overflow")
Variables ¶
var Goroutine = scheduler.Goroutine
var NewScheduler = scheduler.New
Functions ¶
func Equal ¶ added in v0.2.0
func Equal[T comparable]() func(T, T) bool
func Multicast ¶ added in v0.2.0
func Multicast[T any](size int) (Observer[T], Observable[T])
Multicast returns both an Observer and and Observable. The returned Observer is used to send items into the Multicast. The returned Observable is used to subscribe to the Multicast. The Multicast multicasts items send through the Observer to every Subscriber of the Observable.
size size of the item buffer, number of items kept to replay to a new Subscriber.
Backpressure handling depends on the sign of the size argument. For positive size the multicast will block when one of the subscribers lets the buffer fill up. For negative size the multicast will drop items on the blocking subscriber, allowing the others to keep on receiving values. For hot observables dropping is preferred.
func Subject ¶
Subject returns both an Observer and and Observable. The returned Observer is used to send items into the Subject. The returned Observable is used to subscribe to the Subject. The Subject multicasts items send through the Observer to every Subscriber of the Observable.
age max age to keep items in order to replay them to a new Subscriber (0 = no max age). [size] size of the item buffer, number of items kept to replay to a new Subscriber. [cap] capacity of the item buffer, number of items that can be observed before blocking. [scap] capacity of the subscription list, max number of simultaneous subscribers.
Types ¶
type ConcurrentScheduler ¶ added in v0.2.0
type ConcurrentScheduler = scheduler.ConcurrentScheduler
type Connectable ¶
type Connectable[T any] struct { Observable[T] Connector }
Connectable[T] is an Observable[T] with a Connect method. So it has a both a Subscribe method and a Connect method. The Connect method is used to instruct the Connectable[T] to subscribe to its source. The Subscribe method is used to subscribe to the Connectable[T] itself.
func (Connectable[T]) AutoConnect ¶ added in v0.2.0
func (connectable Connectable[T]) AutoConnect(count int) Observable[T]
func (Connectable[T]) RefCount ¶ added in v0.2.0
func (connectable Connectable[T]) RefCount() Observable[T]
RefCount makes a Connectable[T] behave like an ordinary Observable[T]. On first Subscribe it will call Connect on its Connectable[T] and when its last subscriber is Unsubscribed it will cancel the source connection by calling Unsubscribe on the subscription returned by the call to Connect.
type Connector ¶ added in v0.2.3
type Connector func(Scheduler, Subscriber)
Connector provides the Connect method for a Connectable[T].
func (Connector) Connect ¶ added in v0.2.3
func (connect Connector) Connect(schedulers ...Scheduler) Subscription
Connect instructs a Connectable[T] to subscribe to its source and begin emitting items to its subscribers. Connect accepts an optional scheduler argument.
type Float ¶ added in v0.2.2
Float is a constraint that permits any floating-point type. If future releases of Go add new predeclared floating-point types, this constraint will be modified to include them.
type Integer ¶ added in v0.2.2
Integer is a constraint that permits any integer type. If future releases of Go add new predeclared integer types, this constraint will be modified to include them.
type MaxBufferSizeOption ¶ added in v0.2.1
type MaxBufferSizeOption = func(*int)
MaxBufferSizeOption is a function type used for configuring the maximum buffer size of an observable stream.
func WithMaxBufferSize ¶ added in v0.2.1
func WithMaxBufferSize(n int) MaxBufferSizeOption
WithMaxBufferSize creates a MaxBufferSizeOption that sets the maximum buffer size to n. This option is typically used when creating new observables to control memory usage.
type Observable ¶
type Observable[T any] func(Observer[T], Scheduler, Subscriber)
func AsObservable ¶ added in v0.2.0
func AsObservable[T any](observable Observable[any]) Observable[T]
func BufferCount ¶ added in v0.2.0
func BufferCount[T any](observable Observable[T], bufferSize, startBufferEvery int) Observable[[]T]
func CombineAll ¶ added in v0.2.0
func CombineAll[T any](observable Observable[Observable[T]]) Observable[[]T]
func CombineLatest ¶
func CombineLatest[T any](observables ...Observable[T]) Observable[[]T]
func CombineLatest2 ¶ added in v0.2.1
func CombineLatest2[T, U any](first Observable[T], second Observable[U]) Observable[Tuple2[T, U]]
func CombineLatest3 ¶ added in v0.2.1
func CombineLatest3[T, U, V any](first Observable[T], second Observable[U], third Observable[V]) Observable[Tuple3[T, U, V]]
func CombineLatest4 ¶ added in v0.2.1
func CombineLatest4[T, U, V, W any](first Observable[T], second Observable[U], third Observable[V], fourth Observable[W]) Observable[Tuple4[T, U, V, W]]
func CombineLatest5 ¶ added in v0.2.1
func CombineLatest5[T, U, V, W, X any](first Observable[T], second Observable[U], third Observable[V], fourth Observable[W], fifth Observable[X]) Observable[Tuple5[T, U, V, W, X]]
func Concat ¶
func Concat[T any](observables ...Observable[T]) Observable[T]
func ConcatAll ¶ added in v0.2.0
func ConcatAll[T any](observable Observable[Observable[T]]) Observable[T]
func ConcatMap ¶ added in v0.2.0
func ConcatMap[T, U any](observable Observable[T], project func(T) Observable[U]) Observable[U]
func Create ¶
func Create[T any](create Creator[T]) Observable[T]
func Defer ¶
func Defer[T any](factory func() Observable[T]) Observable[T]
func Empty ¶
func Empty[T any]() Observable[T]
func ExhaustAll ¶ added in v0.2.0
func ExhaustAll[T any](observable Observable[Observable[T]]) Observable[T]
func ExhaustMap ¶ added in v0.2.0
func ExhaustMap[T, U any](observable Observable[T], project func(T) Observable[U]) Observable[U]
func From ¶
func From[T any](slice ...T) Observable[T]
func Map ¶ added in v0.2.0
func Map[T, U any](observable Observable[T], project func(T) U) Observable[U]
func MapE ¶ added in v0.2.0
func MapE[T, U any](observable Observable[T], project func(T) (U, error)) Observable[U]
func Merge ¶
func Merge[T any](observables ...Observable[T]) Observable[T]
func MergeAll ¶ added in v0.2.0
func MergeAll[T any](observable Observable[Observable[T]]) Observable[T]
func MergeMap ¶ added in v0.2.0
func MergeMap[T, U any](observable Observable[T], project func(T) Observable[U]) Observable[U]
func Never ¶
func Never[T any]() Observable[T]
func Of ¶
func Of[T any](value T) Observable[T]
func Race ¶ added in v0.2.0
func Race[T any](observables ...Observable[T]) Observable[T]
func Recv ¶ added in v0.2.0
func Recv[T any](ch <-chan T) Observable[T]
func Reduce ¶ added in v0.2.0
func Reduce[T, U any](observable Observable[T], seed U, accumulator func(acc U, next T) U) Observable[U]
func ReduceE ¶ added in v0.2.1
func ReduceE[T, U any](observable Observable[T], seed U, accumulator func(acc U, next T) (U, error)) Observable[U]
func Scan ¶ added in v0.2.0
func Scan[T, U any](observable Observable[T], seed U, accumulator func(acc U, next T) U) Observable[U]
func ScanE ¶ added in v0.2.0
func ScanE[T, U any](observable Observable[T], seed U, accumulator func(acc U, next T) (U, error)) Observable[U]
func SwitchAll ¶ added in v0.2.0
func SwitchAll[T any](observable Observable[Observable[T]]) Observable[T]
func SwitchMap ¶ added in v0.2.0
func SwitchMap[T, U any](o Observable[T], project func(T) Observable[U]) Observable[U]
func Throw ¶
func Throw[T any](err error) Observable[T]
func Ticker ¶
Ticker creates an ObservableTime that emits a sequence of timestamps after an initialDelay has passed. Subsequent timestamps are emitted using a schedule of intervals passed in. If only the initialDelay is given, Ticker will emit only once.
func WithLatestFrom ¶ added in v0.2.0
func WithLatestFrom[T any](observables ...Observable[T]) Observable[[]T]
func WithLatestFrom2 ¶ added in v0.2.1
func WithLatestFrom2[T, U any](first Observable[T], second Observable[U]) Observable[Tuple2[T, U]]
func WithLatestFrom3 ¶ added in v0.2.1
func WithLatestFrom3[T, U, V any](first Observable[T], second Observable[U], third Observable[V]) Observable[Tuple3[T, U, V]]
func WithLatestFrom4 ¶ added in v0.2.1
func WithLatestFrom4[T, U, V, W any](first Observable[T], second Observable[U], third Observable[V], fourth Observable[W]) Observable[Tuple4[T, U, V, W]]
func WithLatestFrom5 ¶ added in v0.2.1
func WithLatestFrom5[T, U, V, W, X any](first Observable[T], second Observable[U], third Observable[V], fourth Observable[W], fifth Observable[X]) Observable[Tuple5[T, U, V, W, X]]
func WithLatestFromAll ¶ added in v0.2.0
func WithLatestFromAll[T any](observable Observable[Observable[T]]) Observable[[]T]
func Zip ¶ added in v0.2.1
func Zip[T any](observables ...Observable[T]) Observable[[]T]
func Zip2 ¶ added in v0.2.1
func Zip2[T, U any](first Observable[T], second Observable[U], options ...MaxBufferSizeOption) Observable[Tuple2[T, U]]
func Zip3 ¶ added in v0.2.1
func Zip3[T, U, V any](first Observable[T], second Observable[U], third Observable[V], options ...MaxBufferSizeOption) Observable[Tuple3[T, U, V]]
func Zip4 ¶ added in v0.2.1
func Zip4[T, U, V, W any](first Observable[T], second Observable[U], third Observable[V], fourth Observable[W], options ...MaxBufferSizeOption) Observable[Tuple4[T, U, V, W]]
func Zip5 ¶ added in v0.2.1
func Zip5[T, U, V, W, X any](first Observable[T], second Observable[U], third Observable[V], fourth Observable[W], fifth Observable[X], options ...MaxBufferSizeOption) Observable[Tuple5[T, U, V, W, X]]
func ZipAll ¶ added in v0.2.1
func ZipAll[T any](observable Observable[Observable[T]], options ...MaxBufferSizeOption) Observable[[]T]
func (Observable[T]) All ¶
func (observable Observable[T]) All(scheduler ...Scheduler) iter.Seq2[int, T]
func (Observable[T]) Append ¶ added in v0.2.1
func (observable Observable[T]) Append(slice *[]T) Observable[T]
func (Observable[T]) AsObservable ¶
func (observable Observable[T]) AsObservable() Observable[any]
func (Observable[T]) Assign ¶ added in v0.2.0
func (observable Observable[T]) Assign(value *T) Observable[T]
func (Observable[T]) AutoUnsubscribe ¶
func (observable Observable[T]) AutoUnsubscribe() Observable[T]
func (Observable[T]) Catch ¶
func (observable Observable[T]) Catch(other Observable[T]) Observable[T]
func (Observable[T]) CatchError ¶
func (observable Observable[T]) CatchError(selector func(err error, caught Observable[T]) Observable[T]) Observable[T]
func (Observable[T]) ConcatWith ¶
func (observable Observable[T]) ConcatWith(others ...Observable[T]) Observable[T]
func (Observable[T]) Count ¶
func (observable Observable[T]) Count() Observable[int]
func (Observable[T]) Delay ¶
func (observable Observable[T]) Delay(duration time.Duration) Observable[T]
func (Observable[T]) DistinctUntilChanged ¶
func (observable Observable[T]) DistinctUntilChanged(equal func(T, T) bool) Observable[T]
func (Observable[T]) Do ¶
func (observable Observable[T]) Do(f func(T)) Observable[T]
func (Observable[T]) ElementAt ¶
func (observable Observable[T]) ElementAt(n int) Observable[T]
func (Observable[T]) EndWith ¶ added in v0.2.2
func (observable Observable[T]) EndWith(values ...T) Observable[T]
func (Observable[T]) Filter ¶
func (observable Observable[T]) Filter(predicate func(T) bool) Observable[T]
func (Observable[T]) First ¶
func (observable Observable[T]) First(schedulers ...Scheduler) (value T, err error)
func (Observable[T]) Fprint ¶ added in v0.2.0
func (observable Observable[T]) Fprint(out io.Writer) Observable[T]
func (Observable[T]) Fprintf ¶ added in v0.2.0
func (observable Observable[T]) Fprintf(out io.Writer, format string) Observable[T]
func (Observable[T]) Fprintln ¶ added in v0.2.0
func (observable Observable[T]) Fprintln(out io.Writer) Observable[T]
func (Observable[T]) Go ¶ added in v0.2.0
func (observable Observable[T]) Go(schedulers ...Scheduler) Subscription
Go subscribes to the observable and starts execution on a separate goroutine. It ignores all emissions from the observable sequence, making it useful when you only care about side effects and not the actual values. By default, it uses the Goroutine scheduler, but an optional scheduler can be provided. Returns a Subscription that can be used to cancel the subscription when no longer needed.
func (Observable[T]) Last ¶
func (observable Observable[T]) Last(schedulers ...Scheduler) (value T, err error)
func (Observable[T]) Map ¶
func (observable Observable[T]) Map(project func(T) any) Observable[any]
func (Observable[T]) MapE ¶ added in v0.2.1
func (observable Observable[T]) MapE(project func(T) (any, error)) Observable[any]
func (Observable[T]) Marshal ¶ added in v0.2.0
func (observable Observable[T]) Marshal(marshal func(any) ([]byte, error)) Observable[[]byte]
func (Observable[T]) MergeWith ¶
func (observable Observable[T]) MergeWith(others ...Observable[T]) Observable[T]
func (Observable[T]) OnComplete ¶ added in v0.2.2
func (observable Observable[T]) OnComplete(f func()) Observable[T]
func (Observable[T]) OnDone ¶ added in v0.2.2
func (observable Observable[T]) OnDone(f func(error)) Observable[T]
func (Observable[T]) OnError ¶ added in v0.2.2
func (observable Observable[T]) OnError(f func(error)) Observable[T]
func (Observable[T]) OnNext ¶ added in v0.2.2
func (observable Observable[T]) OnNext(f func(T)) Observable[T]
func (Observable[T]) Passthrough ¶ added in v0.2.0
func (observable Observable[T]) Passthrough() Observable[T]
func (Observable[T]) Pipe ¶ added in v0.2.0
func (observable Observable[T]) Pipe(segments ...Pipe[T]) Observable[T]
func (Observable[T]) Print ¶ added in v0.2.0
func (observable Observable[T]) Print() Observable[T]
func (Observable[T]) Printf ¶ added in v0.2.0
func (observable Observable[T]) Printf(format string) Observable[T]
func (Observable[T]) Println ¶
func (observable Observable[T]) Println() Observable[T]
func (Observable[T]) Publish ¶
func (observable Observable[T]) Publish() Connectable[T]
func (Observable[T]) RaceWith ¶ added in v0.2.0
func (observable Observable[T]) RaceWith(others ...Observable[T]) Observable[T]
func (Observable[T]) Repeat ¶
func (observable Observable[T]) Repeat(count ...int) Observable[T]
Repeat emits the items emitted by the source Observable repeatedly.
Parameters:
- count: Optional. The number of repetitions:
- If omitted: The source Observable is repeated indefinitely
- If 0: Returns an empty Observable
- If negative: Returns an Observable that emits an error
- If multiple count values: Returns an Observable that emits an error
The resulting Observable will subscribe to the source Observable repeatedly each time the source completes, up to the specified count.
func (Observable[T]) Retry ¶
func (observable Observable[T]) Retry(limit ...int) Observable[T]
func (Observable[T]) RetryTime ¶ added in v0.2.0
func (observable Observable[T]) RetryTime(backoff func(int) time.Duration, limit ...int) Observable[T]
func (Observable[T]) SampleTime ¶
func (observable Observable[T]) SampleTime(window time.Duration) Observable[T]
SampleTime emits the most recent item emitted by an Observable within periodic time intervals.
func (Observable[T]) Send ¶ added in v0.2.0
func (observable Observable[T]) Send(ch chan<- T) Observable[T]
func (Observable[T]) Share ¶ added in v0.2.0
func (observable Observable[T]) Share() Observable[T]
Share returns a new Observable that multicasts (shares) the original Observable. As long as there is at least one Subscriber this Observable will be subscribed and emitting data. When all subscribers have unsubscribed it will unsubscribe from the source Observable. Because the Observable is multicasting it makes the stream hot.
This method is useful when you have an Observable that is expensive to create or has side-effects, but you want to share the results of that Observable with multiple subscribers. By using `Share`, you can avoid creating multiple instances of the Observable and ensure that all subscribers receive the same data.
func (Observable[T]) Skip ¶
func (observable Observable[T]) Skip(n int) Observable[T]
func (Observable[T]) Slice ¶ added in v0.2.0
func (observable Observable[T]) Slice(schedulers ...Scheduler) (slice []T, err error)
func (Observable[T]) StartWith ¶
func (observable Observable[T]) StartWith(values ...T) Observable[T]
func (Observable[T]) Subscribe ¶
func (observable Observable[T]) Subscribe(observe Observer[T], scheduler Scheduler) Subscription
func (Observable[T]) SubscribeOn ¶
func (observable Observable[T]) SubscribeOn(scheduler ConcurrentScheduler) Observable[T]
func (Observable[T]) Take ¶
func (observable Observable[T]) Take(n int) Observable[T]
Take returns an Observable that emits only the first count values emitted by the source Observable. If the source emits fewer than count values then all of its values are emitted. After that, it completes, regardless if the source completes.
func (Observable[T]) TakeWhile ¶
func (observable Observable[T]) TakeWhile(condition func(T) bool) Observable[T]
func (Observable[T]) Tap ¶ added in v0.2.0
func (observable Observable[T]) Tap(tap Observer[T]) Observable[T]
func (Observable[T]) Values ¶ added in v0.2.0
func (observable Observable[T]) Values(scheduler ...Scheduler) iter.Seq[T]
func (Observable[T]) Wait ¶
func (observable Observable[T]) Wait(schedulers ...Scheduler) error
type Observer ¶
func AsObserver ¶ added in v0.2.0
AsObserver converts an Observer of any type to an Observer of a specific type T. This allows adapting a generic Observer to a more specific type context.
func Ignore ¶ added in v0.2.0
Ignore creates an Observer that simply discards any emissions from an Observable. It is useful when you need to create an Observer but don't care about its values.
func (Observer[T]) AsObserver ¶
AsObserver converts a typed Observer[T] to a generic Observer[any]. It handles type conversion from 'any' back to T, and sends a TypecastFailed error when conversion fails.
type Pipe ¶ added in v0.2.0
type Pipe[T any] func(Observable[T]) Observable[T]
func AutoUnsubscribe ¶ added in v0.2.2
func Catch ¶ added in v0.2.0
func Catch[T any](other Observable[T]) Pipe[T]
func CatchError ¶ added in v0.2.0
func CatchError[T any](selector func(err error, caught Observable[T]) Observable[T]) Pipe[T]
func ConcatWith ¶ added in v0.2.1
func ConcatWith[T any](others ...Observable[T]) Pipe[T]
func DistinctUntilChanged ¶ added in v0.2.0
func MergeWith ¶ added in v0.2.1
func MergeWith[T any](others ...Observable[T]) Pipe[T]
func OnComplete ¶ added in v0.2.1
func Passthrough ¶ added in v0.2.0
func RaceWith ¶ added in v0.2.1
func RaceWith[T any](others ...Observable[T]) Pipe[T]
func Repeat ¶ added in v0.2.2
Repeat creates an Observable that emits the entire source sequence multiple times.
Parameters:
- count: Optional. The number of repetitions:
- If omitted: The source Observable is repeated indefinitely
- If 0: Returns an empty Observable
- If negative: Returns an Observable that emits an error
- If multiple count values: Returns an Observable that emits an error
The resulting Observable will subscribe to the source Observable repeatedly each time the source completes, up to the specified count.
type Signed ¶ added in v0.2.2
Signed is a constraint that permits any signed integer type. If future releases of Go add new predeclared signed integer types, this constraint will be modified to include them.
type Subscriber ¶
type Subscriber interface {
// Subscribed returns true if the subscriber is in a subscribed state.
// Returns false once Unsubscribe has been called.
Subscribed() bool
// Unsubscribe changes the state to unsubscribed and executes all registered
// callback functions. Does nothing if already unsubscribed.
Unsubscribe()
// Add creates and returns a new child Subscriber.
// If the parent is already unsubscribed, the child will be created in an
// unsubscribed state. Otherwise, the child will be unsubscribed when the parent
// is unsubscribed.
Add() Subscriber
// OnUnsubscribe registers a callback function to be executed when Unsubscribe is called.
// If the subscriber is already unsubscribed, the callback is executed immediately.
// If callback is nil, this method does nothing.
OnUnsubscribe(callback func())
}
Subscriber is a subscribable entity that allows construction of a Subscriber tree.
type Subscription ¶
type Subscription interface {
// Subscribed returns true until Unsubscribe is called.
Subscribed() bool
// Unsubscribe will change the state to unsubscribed.
Unsubscribe()
// Done returns a channel that is closed when the subscription state changes to unsubscribed.
// This channel can be used with select statements to react to subscription termination events.
// If the scheduler is not concurrent, it will spawn a goroutine to wait for the scheduler.
Done() <-chan struct{}
// Err returns the subscription's terminal state:
// - nil if the observable completed successfully
// - the observable's error if it terminated with an error
// - SubscriptionCanceled if the subscription was manually unsubscribed
// - SubscriptionActive if the subscription is still active
Err() error
// Wait blocks until the subscription state becomes unsubscribed.
// If the subscription is already unsubscribed, it returns immediately.
// If the scheduler is not concurrent, it will wait for the scheduler to complete.
// Returns:
// - nil if the observable completed successfully
// - the observable's error if it terminated with an error
// - SubscriptionCanceled if the subscription was manually unsubscribed
Wait() error
}
Subscription is an interface that allows monitoring and controlling a subscription. It provides methods for tracking the subscription's lifecycle.
type Tuple4 ¶ added in v0.2.1
type Tuple4[T, U, V, W any] struct { First T Second U Third V Fourth W }
Source Files
¶
- all.go
- append.go
- asobservable.go
- assign.go
- autoconnect.go
- autounsubscribe.go
- buffercount.go
- catch.go
- catcherror.go
- combineall.go
- combinelatest.go
- concat.go
- concatall.go
- concatmap.go
- concatwith.go
- connectable.go
- connector.go
- constraints.go
- count.go
- create.go
- creator.go
- defer.go
- delay.go
- distinctuntilchanged.go
- do.go
- doc.go
- elementat.go
- empty.go
- endwith.go
- equal.go
- error.go
- exhaustall.go
- exhaustmap.go
- filter.go
- first.go
- fprint.go
- fprintf.go
- fprintln.go
- from.go
- go.go
- interval.go
- last.go
- map.go
- mape.go
- marshal.go
- maxbuffersize.go
- merge.go
- mergeall.go
- mergemap.go
- mergewith.go
- multicast.go
- must.go
- never.go
- observable.go
- observer.go
- of.go
- oncomplete.go
- ondone.go
- onerror.go
- onnext.go
- passthrough.go
- pipe.go
- print.go
- printf.go
- println.go
- publish.go
- pull.go
- race.go
- racewith.go
- recv.go
- reduce.go
- reducee.go
- refcount.go
- repeat.go
- retry.go
- retrytime.go
- sampletime.go
- scan.go
- scane.go
- scheduler.go
- send.go
- share.go
- skip.go
- slice.go
- startwith.go
- subject.go
- subscribe.go
- subscribeon.go
- subscriber.go
- subscription.go
- switchall.go
- switchmap.go
- take.go
- takewhile.go
- tap.go
- throw.go
- ticker.go
- timer.go
- tuple.go
- values.go
- wait.go
- withlatestfrom.go
- withlatestfromall.go
- zip.go
- zipall.go