Documentation
¶
Overview ¶
Example (All) ¶
package main import ( "fmt" "github.com/reactivego/rx" ) func main() { source := rx.From("ZERO", "ONE", "TWO") for k, v := range source.All() { fmt.Println(k, v) } fmt.Println("OK") }
Output: 0 ZERO 1 ONE 2 TWO OK
Example (BufferCount) ¶
package main import ( "fmt" "github.com/reactivego/rx" ) func main() { source := rx.From(0, 1, 2, 3) fmt.Println("BufferCount(From(0, 1, 2, 3), 2, 1)") rx.BufferCount(source, 2, 1).Println().Wait() fmt.Println("BufferCount(From(0, 1, 2, 3), 2, 2)") rx.BufferCount(source, 2, 2).Println().Wait() fmt.Println("BufferCount(From(0, 1, 2, 3), 2, 3)") rx.BufferCount(source, 2, 3).Println().Wait() fmt.Println("BufferCount(From(0, 1, 2, 3), 3, 2)") rx.BufferCount(source, 3, 2).Println().Wait() fmt.Println("BufferCount(From(0, 1, 2, 3), 6, 6)") rx.BufferCount(source, 6, 6).Println().Wait() fmt.Println("BufferCount(From(0, 1, 2, 3), 2, 0)") rx.BufferCount(source, 2, 0).Println().Wait() }
Output: BufferCount(From(0, 1, 2, 3), 2, 1) [0 1] [1 2] [2 3] [3] BufferCount(From(0, 1, 2, 3), 2, 2) [0 1] [2 3] BufferCount(From(0, 1, 2, 3), 2, 3) [0 1] [3] BufferCount(From(0, 1, 2, 3), 3, 2) [0 1 2] [2 3] BufferCount(From(0, 1, 2, 3), 6, 6) [0 1 2 3] BufferCount(From(0, 1, 2, 3), 2, 0) [0 1]
Example (ConcatAll) ¶
package main import ( "fmt" "time" "github.com/reactivego/rx" ) func main() { source := rx.Empty[rx.Observable[string]]() rx.ConcatAll(source).Wait() source = rx.Of(rx.Empty[string]()) rx.ConcatAll(source).Wait() req := func(request string, duration time.Duration) rx.Observable[string] { req := rx.From(request + " response") if duration == 0 { return req } return req.Delay(duration) } const ms = time.Millisecond req1 := req("first", 10*ms) req2 := req("second", 20*ms) req3 := req("third", 0*ms) req4 := req("fourth", 60*ms) source = rx.From(req1).ConcatWith(rx.From(req2, req3, req4).Delay(100 * ms)) rx.ConcatAll(source).Println().Wait() fmt.Println("OK") }
Output: first response second response third response fourth response OK
Example (Count) ¶
package main import ( "fmt" "github.com/reactivego/rx" ) func main() { source := rx.From(1, 2, 3, 4, 5) count := source.Count() count.Println().Wait() emptySource := rx.Empty[int]() emptyCount := emptySource.Count() emptyCount.Println().Wait() fmt.Println("OK") }
Output: 5 0 OK
Example (ElementAt) ¶
package main import ( "github.com/reactivego/rx" ) func main() { rx.From(0, 1, 2, 3, 4).ElementAt(2).Println().Wait() }
Output: 2
Example (ExhaustAll) ¶
package main import ( "fmt" "strconv" "time" "github.com/reactivego/rx" ) func main() { const ms = time.Millisecond stream := func(name string, duration time.Duration, count int) rx.Observable[string] { return rx.Map(rx.Timer[int](0*ms, duration), func(next int) string { return name + "-" + strconv.Itoa(next) }).Take(count) } streams := []rx.Observable[string]{ stream("a", 20*ms, 3), stream("b", 20*ms, 3), stream("c", 20*ms, 3), rx.Empty[string](), } streamofstreams := rx.Map(rx.Timer[int](20*ms, 30*ms, 250*ms, 100*ms).Take(4), func(next int) rx.Observable[string] { return streams[next] }) err := rx.ExhaustAll(streamofstreams).Println().Wait() if err == nil { fmt.Println("success") } }
Output: a-0 a-1 a-2 c-0 c-1 c-2 success
Example (Marshal) ¶
package main import ( "encoding/json" "github.com/reactivego/rx" ) func main() { type R struct { A string `json:"a"` B string `json:"b"` } b2s := func(data []byte) string { return string(data) } rx.Map(rx.Of(R{"Hello", "World"}).Marshal(json.Marshal), b2s).Println().Wait() }
Output: {"a":"Hello","b":"World"}
Example (Multicast) ¶
package main import ( "fmt" "github.com/reactivego/rx" ) func main() { serial := rx.NewScheduler() in, out := rx.Multicast[int](1) // Ignore everything before any subscriptions, including the last! in.Next(-2) in.Next(-1) in.Next(0) in.Next(1) // Schedule the subsequent emits in a loop. This will be the first task to // run on the serial scheduler after the subscriptions have been added. serial.ScheduleLoop(2, func(index int, again func(next int)) { if index < 4 { in.Next(index) again(index + 1) } else { in.Error(rx.Error("foo")) } }) // Add a couple of subscriptions sub1 := out.Println().Go(serial) sub2 := out.Println().Go(serial) // Let the scheduler run and wait for all of its scheduled tasks to finish. serial.Wait() fmt.Println(sub1.Wait()) fmt.Println(sub2.Wait()) }
Output: 2 2 3 3 foo foo
Example (MulticastDrop) ¶
package main import ( "fmt" "github.com/reactivego/rx" ) func main() { serial := rx.NewScheduler() const onBackpressureDrop = -1 // multicast with backpressure handling set to dropping incoming // items that don't fit in the buffer once it has filled up. in, out := rx.Multicast[int](1 * onBackpressureDrop) // ignore everything before any subscriptions, including the last! in.Next(-2) in.Next(-1) in.Next(0) in.Next(1) // add a couple of subscriptions sub1 := out.Println().Go(serial) sub2 := out.Println().Go(serial) in.Next(2) // accepted: buffer not full in.Next(3) // dropped: buffer full in.Error(rx.Error("foo")) // dropped: buffer full serial.Wait() fmt.Println(sub1.Wait()) fmt.Println(sub2.Wait()) }
Output: 2 2 <nil> <nil>
Example (Race) ¶
package main import ( "fmt" "time" "github.com/reactivego/rx" ) func main() { const ms = time.Millisecond req := func(request string, duration time.Duration) rx.Observable[string] { return rx.From(request + " response").Delay(duration) } req1 := req("first", 50*ms) req2 := req("second", 10*ms) req3 := req("third", 60*ms) rx.Race(req1, req2, req3).Println().Wait() err := func(text string, duration time.Duration) rx.Observable[int] { return rx.Throw[int](rx.Error(text + " error")).Delay(duration) } err1 := err("first", 10*ms) err2 := err("second", 20*ms) err3 := err("third", 30*ms) fmt.Println(rx.Race(err1, err2, err3).Wait(rx.Goroutine)) }
Output: second response first error
Example (Retry) ¶
package main import ( "fmt" "github.com/reactivego/rx" ) func main() { var first error = rx.Error("error") a := rx.Create(func(index int) (next int, err error, done bool) { if index < 3 { return index, nil, false } err, first = first, nil return 0, err, true }) err := a.Retry().Println().Wait() fmt.Println(first == nil) fmt.Println(err) }
Output: 0 1 2 0 1 2 true <nil>
Example (Skip) ¶
package main import ( "github.com/reactivego/rx" ) func main() { rx.From(1, 2, 3, 4, 5).Skip(2).Println().Wait() }
Output: 3 4 5
Example (Subject) ¶
package main import ( "fmt" "github.com/reactivego/rx" ) func main() { serial := rx.NewScheduler() // subject collects emits when there are no subscriptions active. in, out := rx.Subject[int](0, 1) // ignore everything before any subscriptions, except the last because buffer size is 1 in.Next(-2) in.Next(-1) in.Next(0) in.Next(1) // add a couple of subscriptions sub1 := out.Println().Go(serial) sub2 := out.Println().Go(serial) // schedule the subsequent emits on the serial scheduler otherwise these calls // will block because the buffer is full. // subject will detect usage of scheduler on observable side and use it on the // observer side to keep the data flow through the subject going. serial.Schedule(func() { in.Next(2) in.Next(3) in.Error(rx.Error("foo")) }) serial.Wait() fmt.Println(sub1.Wait()) fmt.Println(sub2.Wait()) }
Output: 1 1 2 2 3 3 foo foo
Example (SwitchAll) ¶
package main import ( "fmt" "time" "github.com/reactivego/rx" ) func main() { const ms = time.Millisecond interval42x4 := rx.Interval[int](42 * ms).Take(4) interval16x4 := rx.Interval[int](16 * ms).Take(4) err := rx.SwitchAll(rx.Map(interval42x4, func(next int) rx.Observable[int] { return interval16x4 })).Println().Wait(rx.Goroutine) if err == nil { fmt.Println("success") } }
Output: 0 1 0 1 0 1 0 1 2 3 success
Example (SwitchMap) ¶
package main import ( "fmt" "time" "github.com/reactivego/rx" ) func main() { const ms = time.Millisecond webreq := func(request string, duration time.Duration) rx.Observable[string] { return rx.From(request + " result").Delay(duration) } first := webreq("first", 50*ms) second := webreq("second", 10*ms) latest := webreq("latest", 50*ms) switchmap := rx.SwitchMap(rx.Interval[int](20*ms).Take(3), func(i int) rx.Observable[string] { switch i { case 0: return first case 1: return second case 2: return latest default: return rx.Empty[string]() } }) err := switchmap.Println().Wait() if err == nil { fmt.Println("success") } }
Output: second result latest result success
Example (Values) ¶
package main import ( "fmt" "github.com/reactivego/rx" "github.com/reactivego/scheduler" ) func main() { source := rx.From(1, 3, 5) // Why choose the Goroutine concurrent scheduler? // An observable can actually be at the root of a tree // of separately running observables that have their // responses merged. The Goroutine scheduler allows // these observables to run concurrently. // run the observable on 1 or more goroutines for i := range source.Values(scheduler.Goroutine) { // This is called from a newly created goroutine fmt.Println(i) } // run the observable on the current goroutine for i := range source.Values(scheduler.New()) { fmt.Println(i) } fmt.Println("OK") }
Output: 1 3 5 1 3 5 OK
Index ¶
- Constants
- Variables
- func Equal[T comparable]() func(T, T) bool
- func Max[T cmp.Ordered](a, b T) T
- func Min[T cmp.Ordered](a, b T) T
- func Multicast[T any](size int) (Observer[T], Observable[T])
- func Must[T any](t T, err error) T
- func Subject[T any](age time.Duration, capacity ...int) (Observer[T], Observable[T])
- type ConcurrentScheduler
- type Connect
- type Connectable
- type Creator
- type Error
- type Observable
- func AsObservable[T any](observable Observable[any]) Observable[T]
- func BufferCount[T any](observable Observable[T], bufferSize, startBufferEvery int) Observable[[]T]
- func Combine[T any](observables ...Observable[T]) Observable[[]T]
- func CombineAll[T any](observable Observable[Observable[T]]) Observable[[]T]
- func CombinePair[T, U any](first Observable[T], second Observable[U]) Observable[Pair[T, U]]
- func CombineQuadruple[T, U, V, W any](first Observable[T], second Observable[U], third Observable[V], ...) Observable[Quadruple[T, U, V, W]]
- func CombineTriple[T, U, V any](first Observable[T], second Observable[U], third Observable[V]) Observable[Triple[T, U, V]]
- func Concat[T any](observables ...Observable[T]) Observable[T]
- func ConcatAll[T any](observable Observable[Observable[T]]) Observable[T]
- func ConcatMap[T, U any](observable Observable[T], project func(T) Observable[U]) Observable[U]
- func Create[T any](create Creator[T]) Observable[T]
- func Defer[T any](factory func() Observable[T]) Observable[T]
- func DistinctUntilChanged[T comparable](observable Observable[T]) Observable[T]
- func Empty[T any]() Observable[T]
- func ExhaustAll[T any](observable Observable[Observable[T]]) Observable[T]
- func ExhaustMap[T, U any](observable Observable[T], project func(T) Observable[U]) Observable[U]
- func From[T any](slice ...T) Observable[T]
- func Interval[T constraints.Integer | constraints.Float](interval time.Duration) Observable[T]
- func Map[T, U any](observable Observable[T], project func(T) U) Observable[U]
- func MapE[T, U any](observable Observable[T], project func(T) (U, error)) Observable[U]
- func Merge[T any](observables ...Observable[T]) Observable[T]
- func MergeAll[T any](observable Observable[Observable[T]]) Observable[T]
- func MergeMap[T, U any](observable Observable[T], project func(T) Observable[U]) Observable[U]
- func Never[T any]() Observable[T]
- func Of[T any](value T) Observable[T]
- func Race[T any](observables ...Observable[T]) Observable[T]
- func Recv[T any](ch <-chan T) Observable[T]
- func Reduce[T, U any](observable Observable[T], seed U, accumulator func(acc U, next T) U) Observable[U]
- func Scan[T, U any](observable Observable[T], seed U, accumulator func(acc U, next T) U) Observable[U]
- func ScanE[T, U any](observable Observable[T], seed U, accumulator func(acc U, next T) (U, error)) Observable[U]
- func SwitchAll[T any](observable Observable[Observable[T]]) Observable[T]
- func SwitchMap[T, U any](o Observable[T], project func(T) Observable[U]) Observable[U]
- func Throw[T any](err error) Observable[T]
- func Ticker(initialDelay time.Duration, intervals ...time.Duration) Observable[time.Time]
- func Timer[T constraints.Integer | constraints.Float](initialDelay time.Duration, intervals ...time.Duration) Observable[T]
- func WithLatestFrom[T any](observables ...Observable[T]) Observable[[]T]
- func WithLatestFromAll[T any](observable Observable[Observable[T]]) Observable[[]T]
- func WithLatestFromPair[T, U any](first Observable[T], second Observable[U]) Observable[Pair[T, U]]
- func WithLatestFromQuadruple[T, U, V, W any](first Observable[T], second Observable[U], third Observable[V], ...) Observable[Quadruple[T, U, V, W]]
- func WithLatestFromTriple[T, U, V any](first Observable[T], second Observable[U], third Observable[V]) Observable[Triple[T, U, V]]
- func (observable Observable[T]) All(scheduler ...Scheduler) iter.Seq2[int, T]
- func (observable Observable[T]) AsObservable() Observable[any]
- func (observable Observable[T]) Assign(value *T) Observable[T]
- func (observable Observable[T]) AutoUnsubscribe() Observable[T]
- func (observable Observable[T]) Catch(other Observable[T]) Observable[T]
- func (observable Observable[T]) CatchError(selector func(err error, caught Observable[T]) Observable[T]) Observable[T]
- func (observable Observable[T]) Collect(slice *[]T) Observable[T]
- func (observable Observable[T]) ConcatWith(others ...Observable[T]) Observable[T]
- func (observable Observable[T]) Count() Observable[int]
- func (observable Observable[T]) Delay(duration time.Duration) Observable[T]
- func (observable Observable[T]) DistinctUntilChanged(equal func(T, T) bool) Observable[T]
- func (observable Observable[T]) Do(f func(next T)) Observable[T]
- func (observable Observable[T]) ElementAt(n int) Observable[T]
- func (observable Observable[T]) Filter(predicate func(T) bool) Observable[T]
- func (observable Observable[T]) Finally(f func(error)) Observable[T]
- func (observable Observable[T]) Fprint(out io.Writer) Observable[T]
- func (observable Observable[T]) Fprintf(out io.Writer, format string) Observable[T]
- func (observable Observable[T]) Fprintln(out io.Writer) Observable[T]
- func (observable Observable[T]) Go(schedulers ...Scheduler) Subscription
- func (observable Observable[T]) Marshal(marshal func(any) ([]byte, error)) Observable[[]byte]
- func (observable Observable[T]) MergeWith(others ...Observable[T]) Observable[T]
- func (observable Observable[T]) Passthrough() Observable[T]
- func (observable Observable[T]) Pipe(segments ...Pipe[T]) Observable[T]
- func (observable Observable[T]) Print() Observable[T]
- func (observable Observable[T]) Printf(format string) Observable[T]
- func (observable Observable[T]) Println() Observable[T]
- func (observable Observable[T]) Publish() Connectable[T]
- func (observable Observable[T]) RaceWith(others ...Observable[T]) Observable[T]
- func (observable Observable[T]) Reduce(seed T, accumulator func(acc, next T) T) Observable[T]
- func (observable Observable[T]) Repeat(count ...int) Observable[T]
- func (observable Observable[T]) Retry(limit ...int) Observable[T]
- func (observable Observable[T]) RetryTime(backoff func(int) time.Duration, limit ...int) Observable[T]
- func (observable Observable[T]) SampleTime(window time.Duration) Observable[T]
- func (observable Observable[T]) Send(ch chan<- T) Observable[T]
- func (observable Observable[T]) Share() Observable[T]
- func (observable Observable[T]) Skip(n int) Observable[T]
- func (observable Observable[T]) Slice(schedulers ...Scheduler) (slice []T, err error)
- func (observable Observable[T]) StartWith(values ...T) Observable[T]
- func (observable Observable[T]) Subscribe(observe Observer[T], scheduler Scheduler) Subscription
- func (observable Observable[T]) SubscribeOn(scheduler ConcurrentScheduler) Observable[T]
- func (observable Observable[T]) Take(n int) Observable[T]
- func (observable Observable[T]) TakeWhile(condition func(T) bool) Observable[T]
- func (observable Observable[T]) Tap(tap Observer[T]) Observable[T]
- func (observable Observable[T]) Value(schedulers ...Scheduler) (value T, err error)
- func (observable Observable[T]) Values(scheduler ...Scheduler) iter.Seq[T]
- func (observable Observable[T]) Wait(schedulers ...Scheduler) error
- type Observer
- type Pair
- type Pipe
- func Assign[T any](value *T) Pipe[T]
- func Catch[T any](other Observable[T]) Pipe[T]
- func CatchError[T any](selector func(err error, caught Observable[T]) Observable[T]) Pipe[T]
- func Collect[T any](slice *[]T) Pipe[T]
- func Do[T any](f func(T)) Pipe[T]
- func Filter[T any](predicate func(T) bool) Pipe[T]
- func Finally[T any](f func(error)) Pipe[T]
- func Fprint[T any](out io.Writer) Pipe[T]
- func Fprintf[T any](out io.Writer, format string) Pipe[T]
- func Fprintln[T any](out io.Writer) Pipe[T]
- func Passthrough[T any]() Pipe[T]
- func Print[T any]() Pipe[T]
- func Printf[T any](format string) Pipe[T]
- func Println[T any]() Pipe[T]
- func Send[T any](ch chan<- T) Pipe[T]
- func Skip[T any](n int) Pipe[T]
- func Take[T any](n int) Pipe[T]
- func TakeWhile[T any](condition func(T) bool) Pipe[T]
- func Tap[T any](tap Observer[T]) Pipe[T]
- type Quadruple
- type Scheduler
- type Subscribable
- type Subscriber
- type Subscription
- type Triple
Examples ¶
- Package (All)
- Package (BufferCount)
- Package (ConcatAll)
- Package (Count)
- Package (ElementAt)
- Package (ExhaustAll)
- Package (Marshal)
- Package (Multicast)
- Package (MulticastDrop)
- Package (Race)
- Package (Retry)
- Package (Share)
- Package (Skip)
- Package (Subject)
- Package (SwitchAll)
- Package (SwitchMap)
- Package (Values)
Constants ¶
const InvalidCount = Error("invalid count")
const OutOfSubjectSubscriptions = Error("out of subject subscriptions")
const RepeatCountInvalid = Error("repeat count invalid")
const SubscriptionCanceled = Error("subscription canceled")
SubscriptionCanceled is the error returned by Wait when the Unsubscribe method is called on an active subscription.
const TypecastFailed = Error("typecast failed")
Variables ¶
var Goroutine = scheduler.Goroutine
var NewScheduler = scheduler.New
Functions ¶
func Equal ¶ added in v0.2.0
func Equal[T comparable]() func(T, T) bool
func Multicast ¶ added in v0.2.0
func Multicast[T any](size int) (Observer[T], Observable[T])
Multicast returns both an Observer and and Observable. The returned Observer is used to send items into the Multicast. The returned Observable is used to subscribe to the Multicast. The Multicast multicasts items send through the Observer to every Subscriber of the Observable.
size size of the item buffer, number of items kept to replay to a new Subscriber.
Backpressure handling depends on the sign of the size argument. For positive size the multicast will block when one of the subscribers lets the buffer fill up. For negative size the multicast will drop items on the blocking subscriber, allowing the others to keep on receiving values. For hot observables dropping is preferred.
func Subject ¶
Subject returns both an Observer and and Observable. The returned Observer is used to send items into the Subject. The returned Observable is used to subscribe to the Subject. The Subject multicasts items send through the Observer to every Subscriber of the Observable.
age max age to keep items in order to replay them to a new Subscriber (0 = no max age). [size] size of the item buffer, number of items kept to replay to a new Subscriber. [cap] capacity of the item buffer, number of items that can be observed before blocking. [scap] capacity of the subscription list, max number of simultaneous subscribers.
Types ¶
type ConcurrentScheduler ¶ added in v0.2.0
type ConcurrentScheduler = scheduler.ConcurrentScheduler
type Connect ¶ added in v0.2.0
type Connect func(Scheduler, Subscriber)
Connect provides the Connect method for a Connectable[T].
func (Connect) Connect ¶ added in v0.2.0
func (connect Connect) Connect(schedulers ...Scheduler) Subscription
Connect instructs a Connectable[T] to subscribe to its source and begin emitting items to its subscribers. Connect accepts an optional scheduler argument.
type Connectable ¶
type Connectable[T any] struct { Observable[T] Connect }
Connectable[T] is an Observable[T] with a Connect method. So it has a both a Subscribe method and a Connect method. The Connect method is used to instruct the Connectable[T] to subscribe to its source. The Subscribe method is used to subscribe to the Connectable[T] itself.
func (Connectable[T]) AutoConnect ¶ added in v0.2.0
func (connectable Connectable[T]) AutoConnect(count int) Observable[T]
func (Connectable[T]) RefCount ¶ added in v0.2.0
func (connectable Connectable[T]) RefCount() Observable[T]
RefCount makes a Connectable[T] behave like an ordinary Observable[T]. On first Subscribe it will call Connect on its Connectable[T] and when its last subscriber is Unsubscribed it will cancel the source connection by calling Unsubscribe on the subscription returned by the call to Connect.
type Observable ¶
type Observable[T any] func(Observer[T], Scheduler, Subscriber)
func AsObservable ¶ added in v0.2.0
func AsObservable[T any](observable Observable[any]) Observable[T]
func BufferCount ¶ added in v0.2.0
func BufferCount[T any](observable Observable[T], bufferSize, startBufferEvery int) Observable[[]T]
func Combine ¶ added in v0.2.0
func Combine[T any](observables ...Observable[T]) Observable[[]T]
func CombineAll ¶ added in v0.2.0
func CombineAll[T any](observable Observable[Observable[T]]) Observable[[]T]
func CombinePair ¶ added in v0.2.0
func CombinePair[T, U any](first Observable[T], second Observable[U]) Observable[Pair[T, U]]
func CombineQuadruple ¶ added in v0.2.0
func CombineQuadruple[T, U, V, W any](first Observable[T], second Observable[U], third Observable[V], fourth Observable[W]) Observable[Quadruple[T, U, V, W]]
func CombineTriple ¶ added in v0.2.0
func CombineTriple[T, U, V any](first Observable[T], second Observable[U], third Observable[V]) Observable[Triple[T, U, V]]
func Concat ¶
func Concat[T any](observables ...Observable[T]) Observable[T]
func ConcatAll ¶ added in v0.2.0
func ConcatAll[T any](observable Observable[Observable[T]]) Observable[T]
func ConcatMap ¶ added in v0.2.0
func ConcatMap[T, U any](observable Observable[T], project func(T) Observable[U]) Observable[U]
func Create ¶
func Create[T any](create Creator[T]) Observable[T]
func Defer ¶
func Defer[T any](factory func() Observable[T]) Observable[T]
func DistinctUntilChanged ¶ added in v0.2.0
func DistinctUntilChanged[T comparable](observable Observable[T]) Observable[T]
func Empty ¶
func Empty[T any]() Observable[T]
func ExhaustAll ¶ added in v0.2.0
func ExhaustAll[T any](observable Observable[Observable[T]]) Observable[T]
func ExhaustMap ¶ added in v0.2.0
func ExhaustMap[T, U any](observable Observable[T], project func(T) Observable[U]) Observable[U]
func From ¶
func From[T any](slice ...T) Observable[T]
func Interval ¶
func Interval[T constraints.Integer | constraints.Float](interval time.Duration) Observable[T]
func Map ¶ added in v0.2.0
func Map[T, U any](observable Observable[T], project func(T) U) Observable[U]
func MapE ¶ added in v0.2.0
func MapE[T, U any](observable Observable[T], project func(T) (U, error)) Observable[U]
func Merge ¶
func Merge[T any](observables ...Observable[T]) Observable[T]
func MergeAll ¶ added in v0.2.0
func MergeAll[T any](observable Observable[Observable[T]]) Observable[T]
func MergeMap ¶ added in v0.2.0
func MergeMap[T, U any](observable Observable[T], project func(T) Observable[U]) Observable[U]
func Never ¶
func Never[T any]() Observable[T]
func Of ¶
func Of[T any](value T) Observable[T]
func Race ¶ added in v0.2.0
func Race[T any](observables ...Observable[T]) Observable[T]
func Recv ¶ added in v0.2.0
func Recv[T any](ch <-chan T) Observable[T]
func Reduce ¶ added in v0.2.0
func Reduce[T, U any](observable Observable[T], seed U, accumulator func(acc U, next T) U) Observable[U]
func Scan ¶ added in v0.2.0
func Scan[T, U any](observable Observable[T], seed U, accumulator func(acc U, next T) U) Observable[U]
func ScanE ¶ added in v0.2.0
func ScanE[T, U any](observable Observable[T], seed U, accumulator func(acc U, next T) (U, error)) Observable[U]
func SwitchAll ¶ added in v0.2.0
func SwitchAll[T any](observable Observable[Observable[T]]) Observable[T]
func SwitchMap ¶ added in v0.2.0
func SwitchMap[T, U any](o Observable[T], project func(T) Observable[U]) Observable[U]
func Throw ¶
func Throw[T any](err error) Observable[T]
func Ticker ¶
Ticker creates an ObservableTime that emits a sequence of timestamps after an initialDelay has passed. Subsequent timestamps are emitted using a schedule of intervals passed in. If only the initialDelay is given, Ticker will emit only once.
func Timer ¶
func Timer[T constraints.Integer | constraints.Float](initialDelay time.Duration, intervals ...time.Duration) Observable[T]
func WithLatestFrom ¶ added in v0.2.0
func WithLatestFrom[T any](observables ...Observable[T]) Observable[[]T]
func WithLatestFromAll ¶ added in v0.2.0
func WithLatestFromAll[T any](observable Observable[Observable[T]]) Observable[[]T]
func WithLatestFromPair ¶ added in v0.2.0
func WithLatestFromPair[T, U any](first Observable[T], second Observable[U]) Observable[Pair[T, U]]
func WithLatestFromQuadruple ¶ added in v0.2.0
func WithLatestFromQuadruple[T, U, V, W any](first Observable[T], second Observable[U], third Observable[V], fourth Observable[W]) Observable[Quadruple[T, U, V, W]]
func WithLatestFromTriple ¶ added in v0.2.0
func WithLatestFromTriple[T, U, V any](first Observable[T], second Observable[U], third Observable[V]) Observable[Triple[T, U, V]]
func (Observable[T]) All ¶
func (observable Observable[T]) All(scheduler ...Scheduler) iter.Seq2[int, T]
func (Observable[T]) AsObservable ¶
func (observable Observable[T]) AsObservable() Observable[any]
func (Observable[T]) Assign ¶ added in v0.2.0
func (observable Observable[T]) Assign(value *T) Observable[T]
func (Observable[T]) AutoUnsubscribe ¶
func (observable Observable[T]) AutoUnsubscribe() Observable[T]
func (Observable[T]) Catch ¶
func (observable Observable[T]) Catch(other Observable[T]) Observable[T]
func (Observable[T]) CatchError ¶
func (observable Observable[T]) CatchError(selector func(err error, caught Observable[T]) Observable[T]) Observable[T]
func (Observable[T]) Collect ¶ added in v0.2.0
func (observable Observable[T]) Collect(slice *[]T) Observable[T]
func (Observable[T]) ConcatWith ¶
func (observable Observable[T]) ConcatWith(others ...Observable[T]) Observable[T]
func (Observable[T]) Count ¶
func (observable Observable[T]) Count() Observable[int]
func (Observable[T]) Delay ¶
func (observable Observable[T]) Delay(duration time.Duration) Observable[T]
func (Observable[T]) DistinctUntilChanged ¶
func (observable Observable[T]) DistinctUntilChanged(equal func(T, T) bool) Observable[T]
func (Observable[T]) Do ¶
func (observable Observable[T]) Do(f func(next T)) Observable[T]
func (Observable[T]) ElementAt ¶
func (observable Observable[T]) ElementAt(n int) Observable[T]
func (Observable[T]) Filter ¶
func (observable Observable[T]) Filter(predicate func(T) bool) Observable[T]
func (Observable[T]) Finally ¶
func (observable Observable[T]) Finally(f func(error)) Observable[T]
func (Observable[T]) Fprint ¶ added in v0.2.0
func (observable Observable[T]) Fprint(out io.Writer) Observable[T]
func (Observable[T]) Fprintf ¶ added in v0.2.0
func (observable Observable[T]) Fprintf(out io.Writer, format string) Observable[T]
func (Observable[T]) Fprintln ¶ added in v0.2.0
func (observable Observable[T]) Fprintln(out io.Writer) Observable[T]
func (Observable[T]) Go ¶ added in v0.2.0
func (observable Observable[T]) Go(schedulers ...Scheduler) Subscription
func (Observable[T]) Marshal ¶ added in v0.2.0
func (observable Observable[T]) Marshal(marshal func(any) ([]byte, error)) Observable[[]byte]
func (Observable[T]) MergeWith ¶
func (observable Observable[T]) MergeWith(others ...Observable[T]) Observable[T]
func (Observable[T]) Passthrough ¶ added in v0.2.0
func (observable Observable[T]) Passthrough() Observable[T]
func (Observable[T]) Pipe ¶ added in v0.2.0
func (observable Observable[T]) Pipe(segments ...Pipe[T]) Observable[T]
func (Observable[T]) Print ¶ added in v0.2.0
func (observable Observable[T]) Print() Observable[T]
func (Observable[T]) Printf ¶ added in v0.2.0
func (observable Observable[T]) Printf(format string) Observable[T]
func (Observable[T]) Println ¶
func (observable Observable[T]) Println() Observable[T]
func (Observable[T]) Publish ¶
func (observable Observable[T]) Publish() Connectable[T]
func (Observable[T]) RaceWith ¶ added in v0.2.0
func (observable Observable[T]) RaceWith(others ...Observable[T]) Observable[T]
func (Observable[T]) Reduce ¶
func (observable Observable[T]) Reduce(seed T, accumulator func(acc, next T) T) Observable[T]
func (Observable[T]) Repeat ¶
func (observable Observable[T]) Repeat(count ...int) Observable[T]
Repeat creates an Observable that emits a sequence of items repeatedly. The count is the number of times to repeat the sequence. If count is not provided, the sequence is repeated indefinitely.
func (Observable[T]) Retry ¶
func (observable Observable[T]) Retry(limit ...int) Observable[T]
func (Observable[T]) RetryTime ¶ added in v0.2.0
func (observable Observable[T]) RetryTime(backoff func(int) time.Duration, limit ...int) Observable[T]
func (Observable[T]) SampleTime ¶
func (observable Observable[T]) SampleTime(window time.Duration) Observable[T]
SampleTime emits the most recent item emitted by an Observable within periodic time intervals.
func (Observable[T]) Send ¶ added in v0.2.0
func (observable Observable[T]) Send(ch chan<- T) Observable[T]
func (Observable[T]) Share ¶ added in v0.2.0
func (observable Observable[T]) Share() Observable[T]
Share returns a new Observable that multicasts (shares) the original Observable. As long as there is at least one Subscriber this Observable will be subscribed and emitting data. When all subscribers have unsubscribed it will unsubscribe from the source Observable. Because the Observable is multicasting it makes the stream hot.
This method is useful when you have an Observable that is expensive to create or has side-effects, but you want to share the results of that Observable with multiple subscribers. By using `Share`, you can avoid creating multiple instances of the Observable and ensure that all subscribers receive the same data.
func (Observable[T]) Skip ¶
func (observable Observable[T]) Skip(n int) Observable[T]
func (Observable[T]) Slice ¶ added in v0.2.0
func (observable Observable[T]) Slice(schedulers ...Scheduler) (slice []T, err error)
func (Observable[T]) StartWith ¶
func (observable Observable[T]) StartWith(values ...T) Observable[T]
func (Observable[T]) Subscribe ¶
func (observable Observable[T]) Subscribe(observe Observer[T], scheduler Scheduler) Subscription
func (Observable[T]) SubscribeOn ¶
func (observable Observable[T]) SubscribeOn(scheduler ConcurrentScheduler) Observable[T]
func (Observable[T]) Take ¶
func (observable Observable[T]) Take(n int) Observable[T]
func (Observable[T]) TakeWhile ¶
func (observable Observable[T]) TakeWhile(condition func(T) bool) Observable[T]
func (Observable[T]) Tap ¶ added in v0.2.0
func (observable Observable[T]) Tap(tap Observer[T]) Observable[T]
func (Observable[T]) Value ¶ added in v0.2.0
func (observable Observable[T]) Value(schedulers ...Scheduler) (value T, err error)
func (Observable[T]) Values ¶ added in v0.2.0
func (observable Observable[T]) Values(scheduler ...Scheduler) iter.Seq[T]
func (Observable[T]) Wait ¶
func (observable Observable[T]) Wait(schedulers ...Scheduler) error
type Observer ¶
func (Observer[T]) AsObserver ¶
type Pipe ¶ added in v0.2.0
type Pipe[T any] func(Observable[T]) Observable[T]
func Catch ¶ added in v0.2.0
func Catch[T any](other Observable[T]) Pipe[T]
func CatchError ¶ added in v0.2.0
func CatchError[T any](selector func(err error, caught Observable[T]) Observable[T]) Pipe[T]
func Passthrough ¶ added in v0.2.0
type Quadruple ¶ added in v0.2.0
type Quadruple[T, U, V, W any] struct { First T Second U Third V Fourth W }
type Subscribable ¶ added in v0.2.0
type Subscribable interface { // Subscribed returns true when the Subscribable is currently active. Subscribed() bool // Unsubscribe will do nothing if it not an active. If it is still active // however, it will be changed to canceled. Subsequently, it will call // Unsubscribe down the subscriber tree on all children, along with all // methods added through OnUnsubscribe on those childern. On Unsubscribe // any call to the Wait method on a Subsvcription will return the error // ErrUnsubscribed. Unsubscribe() // Canceled returns true when the Subscribable has been canceled. Canceled() bool }
Subscribable is the interface shared by Subscription and Subscriber.
type Subscriber ¶
type Subscriber interface { // A Subscriber is a Subscribable Subscribable // Add will create and return a new child Subscriber setup in such a way that // calling Unsubscribe on the parent will also call Unsubscribe on the child. // Calling the Unsubscribe method on the child will NOT propagate to the // parent! Add() Subscriber // OnUnsubscribe will add the given callback function to the Subscriber. // The callback will be called when either the Unsubscribe of the parent // or of the Subscriber itself is called. If the subscription was already // canceled, then the callback function will just be called immediately. OnUnsubscribe(callback func()) }
Subscriber is a Subscribable that allows construction of a Subscriber tree.
type Subscription ¶
type Subscription interface { Subscribable // Wait will by default block the calling goroutine and wait for the // Unsubscribe method to be called on this subscription. // However, when OnWait was called with a callback wait function it will // call that instead. Calling Wait on a subscription that has already been // canceled will return immediately. If the subscriber was canceled by // calling Unsubscribe, then the error returned is ErrUnsubscribed. // If the subscriber was terminated by calling Done, then the error // returned here is the one passed to Done. Wait() error }
Subscription is an interface that allows code to monitor and control a subscription it received.
Source Files
¶
- all.go
- asobservable.go
- asobserver.go
- assign.go
- autoconnect.go
- autounsubscribe.go
- buffercount.go
- catch.go
- catcherror.go
- collect.go
- combine.go
- combineall.go
- concat.go
- concatall.go
- concatmap.go
- concatwith.go
- connect.go
- connectable.go
- count.go
- create.go
- creator.go
- defer.go
- delay.go
- distinctuntilchanged.go
- do.go
- elementat.go
- empty.go
- equal.go
- error.go
- exhaustall.go
- exhaustmap.go
- filter.go
- finally.go
- fprint.go
- fprintf.go
- fprintln.go
- from.go
- go.go
- ignore.go
- interval.go
- map.go
- mape.go
- marshal.go
- max.go
- merge.go
- mergeall.go
- mergemap.go
- mergewith.go
- min.go
- multicast.go
- must.go
- never.go
- observable.go
- observer.go
- of.go
- pair.go
- passthrough.go
- pipe.go
- print.go
- printf.go
- println.go
- publish.go
- quadruple.go
- race.go
- racewith.go
- recv.go
- reduce.go
- refcount.go
- repeat.go
- retry.go
- retrytime.go
- sampletime.go
- scan.go
- scane.go
- scheduler.go
- send.go
- share.go
- skip.go
- slice.go
- startwith.go
- subject.go
- subscribable.go
- subscribe.go
- subscribeon.go
- subscriber.go
- subscription.go
- switchall.go
- switchmap.go
- take.go
- takewhile.go
- tap.go
- throw.go
- ticker.go
- timer.go
- triple.go
- value.go
- values.go
- wait.go
- withlatestfrom.go
- withlatestfromall.go