rx

package module
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 21, 2025 License: MIT Imports: 11 Imported by: 0

README

rx

import "github.com/reactivego/rx"

Go Reference

Package rx delivers Reactive Extensions, a powerful API for asynchronous programming in Go, built around observables and operators to handle data streams seamlessly.

Prerequisites

You’ll need Go 1.23 or later, as it includes support for generics and iterators.

Observables

In rx, an Observables represents a stream of data that can emit items over time, while an observer subscribes to it to receive and react to those emissions. This reactive approach enables asynchronous and concurrent operations without blocking execution. Instead of waiting for values to become available, an observer passively listens and responds whenever the Observable emits data, errors, or a completion signal.

This page introduces the reactive pattern, explaining what Observables and observers are and how subscriptions work. Other sections explore the powerful set of Observable operators that allow you to transform, combine, and control data streams efficiently.

An Observable:

  • is a stream of events.
  • assumes zero to many values over time.
  • pushes values
  • can take any amount of time to complete (or may never)
  • is cancellable
  • is lazy (it doesn't do anything until you subscribe).

Example

package main

import "github.com/reactivego/x"

func main() {
    x.From[any](1,"hi",2.3).Println()
}

Note the program creates a mixed type any observable from an int, string and a float64.

Output

1
hi
2.3

Example

package main

import "github.com/reactivego/rx"

func main() {
    rx.From(1,2,3).Println()
}

Note the program uses inferred type int for the observable.

Output

1
2
3

Observables in x are somewhat similar to Go channels but have much richer semantics:

Observables can be hot or cold. A hot observable will try to emit values even when nobody is subscribed. Values emitted during that period will be lost. The position of a mouse pointer or the current time are examples of hot observables.

A cold observable will only start emitting values after somebody subscribes. The contents of a file or a database are examples of cold observables.

An observable can complete normally or with an error, it uses subscriptions that can be canceled from the subscriber side. Where a normal variable is just a place where you read and write values from, an observable captures how the value of this variable changes over time.

Concurrency flows naturally from the fact that an observable is an ever changing stream of values. Every Observable conceptually has at its core a concurrently running process that pushes out values.

Operators

Operators form a language in which programs featuring Observables can be expressed. They work on one or more Observables to transform, filter and combine them into new Observables.

Index
  • All determines whether all items emitted by an Observable meet some criteria.
  • AsObservable when called on an Observable source will type assert the 'any' items of the source to 'bar' items.
  • AsyncSubject emits the last value (and only the last value) emitted by the Observable part, and only after that Observable part completes.
  • AuditTime waits until the source emits and then starts a timer.
  • AutoConnect makes a Multicaster behave like an ordinary Observable that automatically connects the multicaster to its source when the specified number of observers have subscribed to it.
  • Average calculates the average of numbers emitted by an Observable and emits this average.
  • BehaviorSubject returns a new BehaviorSubject.
  • Buffer buffers the source Observable values until closingNotifier emits.
  • BufferTime buffers the source Observable values for a specific time period and emits those as a slice periodically in time.
  • Catch recovers from an error notification by continuing the sequence without emitting the error but by switching to the catch ObservableInt to provide items.
  • CatchError catches errors on the Observable to be handled by returning a new Observable or throwing an error.
  • CombineLatest will subscribe to all Observables.
  • CombineLatestAll flattens a higher order observable.
  • CombineLatestMap maps every entry emitted by the Observable into an Observable, and then subscribe to it, until the source observable completes.
  • CombineLatestMapTo maps every entry emitted by the Observable into a single Observable, and then subscribe to it, until the source observable completes.
  • CombineLatestWith will subscribe to its Observable and all other Observables passed in.
  • Concat emits the emissions from two or more observables without interleaving them.
  • ConcatAll flattens a higher order observable by concattenating the observables it emits.
  • ConcatMap transforms the items emitted by an Observable by applying a function to each item and returning an Observable.
  • ConcatMapTo maps every entry emitted by the Observable into a single Observable.
  • ConcatWith emits the emissions from two or more observables without interleaving them.
  • Connect instructs a connectable Observable to begin emitting items to its subscribers.
  • Count counts the number of items emitted by the source ObservableInt and emits only this value.
  • Create provides a way of creating an Observable from scratch by calling observer methods programmatically.
  • CreateFutureRecursive provides a way of creating an Observable from scratch by calling observer methods programmatically.
  • CreateRecursive provides a way of creating an Observable from scratch by calling observer methods programmatically.
  • DebounceTime only emits the last item of a burst from an Observable if a particular timespan has passed without it emitting another item.
  • Defer does not create the Observable until the observer subscribes.
  • Delay shifts the emission from an Observable forward in time by a particular amount of time.
  • Distinct suppress duplicate items emitted by an Observable.
  • DistinctUntilChanged only emits when the current value is different from the last.
  • Do calls a function for each next value passing through the observable.
  • DoOnComplete calls a function when the stream completes.
  • DoOnError calls a function for any error on the stream.
  • ElementAt emit only item n emitted by an Observable.
  • Empty creates an Observable that emits no items but terminates normally.
  • Filter emits only those items from an observable that pass a predicate test.
  • Finally applies a function for any error or completion on the stream.
  • First emits only the first item, or the first item that meets a condition, from an Observable.
  • From creates an observable from multiple values passed in.
  • FromChan creates an Observable from a Go channel.
  • IgnoreCompletion only emits items and never completes, neither with Error nor with Complete.
  • IgnoreElements does not emit any items from an Observable but mirrors its termination notification.
  • Interval creates an ObservableInt that emits a sequence of integers spaced by a particular time interval.
  • Just creates an observable that emits a particular item.
  • Last emits only the last item emitted by an Observable.
  • Map transforms the items emitted by an Observable by applying a function to each item.
  • MapTo transforms the items emitted by an Observable.
  • Max determines, and emits, the maximum-valued item emitted by an Observable.
  • Merge combines multiple Observables into one by merging their emissions.
  • MergeAll flattens a higher order observable by merging the observables it emits.
  • MergeDelayError combines multiple Observables into one by merging their emissions.
  • MergeDelayErrorWith combines multiple Observables into one by merging their emissions.
  • MergeMap transforms the items emitted by an Observable by applying a function to each item an returning an Observable.
  • MergeMapTo maps every entry emitted by the Observable into a single Observable.
  • MergeWith combines multiple Observables into one by merging their emissions.
  • Min determines, and emits, the minimum-valued item emitted by an Observable.
  • Never creates an Observable that emits no items and does't terminate.
  • ObserveOn specifies a schedule function to use for delivering values to the observer.
  • ObserverObservable actually is an observer that is made observable.
  • Of emits a variable amount of values in a sequence and then emits a complete notification.
  • Only filters the value stream of an observable and lets only the values of a specific type pass.
  • Passthrough just passes through all output from the Observable.
  • Println subscribes to the Observable and prints every item to os.Stdout while it waits for completion or error.
  • Publish returns a Multicaster for a Subject to an underlying Observable and turns the subject into a connnectable observable.
  • PublishBehavior returns a Multicaster that shares a single subscription to the underlying Observable returning an initial value or the last value emitted by the underlying Observable.
  • PublishLast returns a Multicaster that shares a single subscription to the underlying Observable containing only the last value emitted before it completes.
  • PublishReplay returns a Multicaster for a ReplaySubject to an underlying Observable and turns the subject into a connectable observable.
  • Range creates an Observable that emits a range of sequential int values.
  • Reduce applies a reducer function to each item emitted by an Observable and the previous reducer result.
  • RefCount makes a Connectable behave like an ordinary Observable.
  • Repeat creates an observable that emits a sequence of items repeatedly.
  • ReplaySubject ensures that all observers see the same sequence of emitted items, even if they subscribe after.
  • Retry if a source Observable sends an error notification, resubscribe to it in the hopes that it will complete without error.
  • SampleTime emits the most recent item emitted by an Observable within periodic time intervals.
  • Scan applies a accumulator function to each item emitted by an Observable and the previous accumulator result.
  • Serialize forces an observable to make serialized calls and to be well-behaved.
  • Single enforces that the observable sends exactly one data item and then completes.
  • Skip suppresses the first n items emitted by an Observable.
  • SkipLast suppresses the last n items emitted by an Observable.
  • Start creates an Observable that emits the return value of a function.
  • StartWith returns an observable that, at the moment of subscription, will synchronously emit all values provided to this operator, then subscribe to the source and mirror all of its emissions to subscribers.
  • Subject is a combination of an observer and observable.
  • Subscribe operates upon the emissions and notifications from an Observable.
  • SubscribeOn specifies the scheduler an Observable should use when it is subscribed to.
  • Sum calculates the sum of numbers emitted by an Observable and emits this sum.
  • SwitchAll converts an Observable that emits Observables into a single Observable that emits the items emitted by the most-recently-emitted of those Observables.
  • SwitchMap transforms the items emitted by an Observable by applying a function to each item an returning an Observable.
  • Take emits only the first n items emitted by an Observable.
  • TakeLast emits only the last n items emitted by an Observable.
  • TakeUntil emits items emitted by an Observable until another Observable emits an item.
  • TakeWhile mirrors items emitted by an Observable until a specified condition becomes false.
  • ThrottleTime emits when the source emits and then starts a timer during which all emissions from the source are ignored.
  • Throw creates an observable that emits no items and terminates with an error.
  • Ticker creates an ObservableTime that emits a sequence of timestamps after an initialDelay has passed.
  • TimeInterval intercepts the items from the source Observable and emits in their place a struct that indicates the amount of time that elapsed between pairs of emissions.
  • Timeout mirrors the source Observable, but issue an error notification if a particular period of time elapses without any emitted items.
  • Timer creates an Observable that emits a sequence of integers (starting at zero) after an initialDelay has passed.
  • Timestamp attaches a timestamp to each item emitted by an observable indicating when it was emitted.
  • ToChan returns a channel that emits 'any' values.
  • ToSingle blocks until the Observable emits exactly one value or an error.
  • ToSlice collects all values from the Observable into an slice.
  • Wait subscribes to the Observable and waits for completion or error.
  • WithLatestFrom will subscribe to all Observables and wait for all of them to emit before emitting the first slice.
  • WithLatestFromAll flattens a higher order observable.

Documentation

Overview

Example (All)
package main

import (
	"fmt"

	"github.com/reactivego/rx"
)

func main() {
	source := rx.From("ZERO", "ONE", "TWO")

	for k, v := range source.All() {
		fmt.Println(k, v)
	}

	fmt.Println("OK")
}
Output:

0 ZERO
1 ONE
2 TWO
OK
Example (BufferCount)
package main

import (
	"fmt"

	"github.com/reactivego/rx"
)

func main() {
	source := rx.From(0, 1, 2, 3)

	fmt.Println("BufferCount(From(0, 1, 2, 3), 2, 1)")
	rx.BufferCount(source, 2, 1).Println().Wait()

	fmt.Println("BufferCount(From(0, 1, 2, 3), 2, 2)")
	rx.BufferCount(source, 2, 2).Println().Wait()

	fmt.Println("BufferCount(From(0, 1, 2, 3), 2, 3)")
	rx.BufferCount(source, 2, 3).Println().Wait()

	fmt.Println("BufferCount(From(0, 1, 2, 3), 3, 2)")
	rx.BufferCount(source, 3, 2).Println().Wait()

	fmt.Println("BufferCount(From(0, 1, 2, 3), 6, 6)")
	rx.BufferCount(source, 6, 6).Println().Wait()

	fmt.Println("BufferCount(From(0, 1, 2, 3), 2, 0)")
	rx.BufferCount(source, 2, 0).Println().Wait()
}
Output:

BufferCount(From(0, 1, 2, 3), 2, 1)
[0 1]
[1 2]
[2 3]
[3]
BufferCount(From(0, 1, 2, 3), 2, 2)
[0 1]
[2 3]
BufferCount(From(0, 1, 2, 3), 2, 3)
[0 1]
[3]
BufferCount(From(0, 1, 2, 3), 3, 2)
[0 1 2]
[2 3]
BufferCount(From(0, 1, 2, 3), 6, 6)
[0 1 2 3]
BufferCount(From(0, 1, 2, 3), 2, 0)
[0 1]
Example (ConcatAll)
package main

import (
	"fmt"
	"time"

	"github.com/reactivego/rx"
)

func main() {
	source := rx.Empty[rx.Observable[string]]()
	rx.ConcatAll(source).Wait()

	source = rx.Of(rx.Empty[string]())
	rx.ConcatAll(source).Wait()

	req := func(request string, duration time.Duration) rx.Observable[string] {
		req := rx.From(request + " response")
		if duration == 0 {
			return req
		}
		return req.Delay(duration)
	}

	const ms = time.Millisecond

	req1 := req("first", 10*ms)
	req2 := req("second", 20*ms)
	req3 := req("third", 0*ms)
	req4 := req("fourth", 60*ms)

	source = rx.From(req1).ConcatWith(rx.From(req2, req3, req4).Delay(100 * ms))
	rx.ConcatAll(source).Println().Wait()

	fmt.Println("OK")
}
Output:

first response
second response
third response
fourth response
OK
Example (Count)
package main

import (
	"fmt"

	"github.com/reactivego/rx"
)

func main() {
	source := rx.From(1, 2, 3, 4, 5)

	count := source.Count()
	count.Println().Wait()

	emptySource := rx.Empty[int]()
	emptyCount := emptySource.Count()
	emptyCount.Println().Wait()

	fmt.Println("OK")
}
Output:

5
0
OK
Example (ElementAt)
package main

import (
	"github.com/reactivego/rx"
)

func main() {
	rx.From(0, 1, 2, 3, 4).ElementAt(2).Println().Wait()
}
Output:

2
Example (ExhaustAll)
package main

import (
	"fmt"
	"strconv"
	"time"

	"github.com/reactivego/rx"
)

func main() {
	const ms = time.Millisecond

	stream := func(name string, duration time.Duration, count int) rx.Observable[string] {
		return rx.Map(rx.Timer[int](0*ms, duration), func(next int) string {
			return name + "-" + strconv.Itoa(next)
		}).Take(count)
	}

	streams := []rx.Observable[string]{
		stream("a", 20*ms, 3),
		stream("b", 20*ms, 3),
		stream("c", 20*ms, 3),
		rx.Empty[string](),
	}

	streamofstreams := rx.Map(rx.Timer[int](20*ms, 30*ms, 250*ms, 100*ms).Take(4), func(next int) rx.Observable[string] {
		return streams[next]
	})

	err := rx.ExhaustAll(streamofstreams).Println().Wait()

	if err == nil {
		fmt.Println("success")
	}
}
Output:

a-0
a-1
a-2
c-0
c-1
c-2
success
Example (Marshal)
package main

import (
	"encoding/json"

	"github.com/reactivego/rx"
)

func main() {
	type R struct {
		A string `json:"a"`
		B string `json:"b"`
	}

	b2s := func(data []byte) string { return string(data) }

	rx.Map(rx.Of(R{"Hello", "World"}).Marshal(json.Marshal), b2s).Println().Wait()
}
Output:

{"a":"Hello","b":"World"}
Example (Multicast)
package main

import (
	"fmt"

	"github.com/reactivego/rx"
)

func main() {
	serial := rx.NewScheduler()

	in, out := rx.Multicast[int](1)

	// Ignore everything before any subscriptions, including the last!
	in.Next(-2)
	in.Next(-1)
	in.Next(0)
	in.Next(1)

	// Schedule the subsequent emits in a loop. This will be the first task to
	// run on the serial scheduler after the subscriptions have been added.
	serial.ScheduleLoop(2, func(index int, again func(next int)) {
		if index < 4 {
			in.Next(index)
			again(index + 1)
		} else {
			in.Error(rx.Error("foo"))
		}
	})

	// Add a couple of subscriptions
	sub1 := out.Println().Go(serial)
	sub2 := out.Println().Go(serial)

	// Let the scheduler run and wait for all of its scheduled tasks to finish.
	serial.Wait()
	fmt.Println(sub1.Wait())
	fmt.Println(sub2.Wait())
}
Output:

2
2
3
3
foo
foo
Example (MulticastDrop)
package main

import (
	"fmt"

	"github.com/reactivego/rx"
)

func main() {
	serial := rx.NewScheduler()

	const onBackpressureDrop = -1

	// multicast with backpressure handling set to dropping incoming
	// items that don't fit in the buffer once it has filled up.
	in, out := rx.Multicast[int](1 * onBackpressureDrop)

	// ignore everything before any subscriptions, including the last!
	in.Next(-2)
	in.Next(-1)
	in.Next(0)
	in.Next(1)

	// add a couple of subscriptions
	sub1 := out.Println().Go(serial)
	sub2 := out.Println().Go(serial)

	in.Next(2)                // accepted: buffer not full
	in.Next(3)                // dropped: buffer full
	in.Error(rx.Error("foo")) // dropped: buffer full

	serial.Wait()
	fmt.Println(sub1.Wait())
	fmt.Println(sub2.Wait())
}
Output:

2
2
<nil>
<nil>
Example (Race)
package main

import (
	"fmt"
	"time"

	"github.com/reactivego/rx"
)

func main() {
	const ms = time.Millisecond

	req := func(request string, duration time.Duration) rx.Observable[string] {
		return rx.From(request + " response").Delay(duration)
	}

	req1 := req("first", 50*ms)
	req2 := req("second", 10*ms)
	req3 := req("third", 60*ms)

	rx.Race(req1, req2, req3).Println().Wait()

	err := func(text string, duration time.Duration) rx.Observable[int] {
		return rx.Throw[int](rx.Error(text + " error")).Delay(duration)
	}

	err1 := err("first", 10*ms)
	err2 := err("second", 20*ms)
	err3 := err("third", 30*ms)

	fmt.Println(rx.Race(err1, err2, err3).Wait(rx.Goroutine))
}
Output:

second response
first error
Example (Retry)
package main

import (
	"fmt"

	"github.com/reactivego/rx"
)

func main() {
	var first error = rx.Error("error")
	a := rx.Create(func(index int) (next int, err error, done bool) {
		if index < 3 {
			return index, nil, false
		}
		err, first = first, nil
		return 0, err, true
	})
	err := a.Retry().Println().Wait()
	fmt.Println(first == nil)
	fmt.Println(err)
}
Output:

0
1
2
0
1
2
true
<nil>
Example (Share)
package main

import (
	"github.com/reactivego/rx"
)

func main() {
	serial := rx.NewScheduler()

	shared := rx.From(1, 2, 3).Share()

	shared.Println().Go(serial)
	shared.Println().Go(serial)
	shared.Println().Go(serial)

	serial.Wait()
}
Output:

1
1
1
2
2
2
3
3
3
Example (Skip)
package main

import (
	"github.com/reactivego/rx"
)

func main() {
	rx.From(1, 2, 3, 4, 5).Skip(2).Println().Wait()
}
Output:

3
4
5
Example (Subject)
package main

import (
	"fmt"

	"github.com/reactivego/rx"
)

func main() {
	serial := rx.NewScheduler()

	// subject collects emits when there are no subscriptions active.
	in, out := rx.Subject[int](0, 1)

	// ignore everything before any subscriptions, except the last because buffer size is 1
	in.Next(-2)
	in.Next(-1)
	in.Next(0)
	in.Next(1)

	// add a couple of subscriptions
	sub1 := out.Println().Go(serial)
	sub2 := out.Println().Go(serial)

	// schedule the subsequent emits on the serial scheduler otherwise these calls
	// will block because the buffer is full.
	// subject will detect usage of scheduler on observable side and use it on the
	// observer side to keep the data flow through the subject going.
	serial.Schedule(func() {
		in.Next(2)
		in.Next(3)
		in.Error(rx.Error("foo"))
	})

	serial.Wait()
	fmt.Println(sub1.Wait())
	fmt.Println(sub2.Wait())
}
Output:

1
1
2
2
3
3
foo
foo
Example (SwitchAll)
package main

import (
	"fmt"
	"time"

	"github.com/reactivego/rx"
)

func main() {
	const ms = time.Millisecond

	interval42x4 := rx.Interval[int](42 * ms).Take(4)
	interval16x4 := rx.Interval[int](16 * ms).Take(4)

	err := rx.SwitchAll(rx.Map(interval42x4, func(next int) rx.Observable[int] { return interval16x4 })).Println().Wait(rx.Goroutine)

	if err == nil {
		fmt.Println("success")
	}
}
Output:

0
1
0
1
0
1
0
1
2
3
success
Example (SwitchMap)
package main

import (
	"fmt"
	"time"

	"github.com/reactivego/rx"
)

func main() {
	const ms = time.Millisecond

	webreq := func(request string, duration time.Duration) rx.Observable[string] {
		return rx.From(request + " result").Delay(duration)
	}

	first := webreq("first", 50*ms)
	second := webreq("second", 10*ms)
	latest := webreq("latest", 50*ms)

	switchmap := rx.SwitchMap(rx.Interval[int](20*ms).Take(3), func(i int) rx.Observable[string] {
		switch i {
		case 0:
			return first
		case 1:
			return second
		case 2:
			return latest
		default:
			return rx.Empty[string]()
		}
	})

	err := switchmap.Println().Wait()
	if err == nil {
		fmt.Println("success")
	}
}
Output:

second result
latest result
success
Example (Values)
package main

import (
	"fmt"

	"github.com/reactivego/rx"
	"github.com/reactivego/scheduler"
)

func main() {
	source := rx.From(1, 3, 5)

	// Why choose the Goroutine concurrent scheduler?
	// An observable can actually be at the root of a tree
	// of separately running observables that have their
	// responses merged. The Goroutine scheduler allows
	// these observables to run concurrently.

	// run the observable on 1 or more goroutines
	for i := range source.Values(scheduler.Goroutine) {
		// This is called from a newly created goroutine
		fmt.Println(i)
	}

	// run the observable on the current goroutine
	for i := range source.Values(scheduler.New()) {
		fmt.Println(i)
	}

	fmt.Println("OK")
}
Output:

1
3
5
1
3
5
OK

Index

Examples

Constants

View Source
const InvalidCount = Error("invalid count")
View Source
const OutOfSubjectSubscriptions = Error("out of subject subscriptions")
View Source
const RepeatCountInvalid = Error("repeat count invalid")
View Source
const SubscriptionCanceled = Error("subscription canceled")

SubscriptionCanceled is the error returned by Wait when the Unsubscribe method is called on an active subscription.

View Source
const TypecastFailed = Error("typecast failed")

Variables

View Source
var Goroutine = scheduler.Goroutine
View Source
var NewScheduler = scheduler.New

Functions

func Equal added in v0.2.0

func Equal[T comparable]() func(T, T) bool

func Max added in v0.2.0

func Max[T cmp.Ordered](a, b T) T

func Min added in v0.2.0

func Min[T cmp.Ordered](a, b T) T

func Multicast added in v0.2.0

func Multicast[T any](size int) (Observer[T], Observable[T])

Multicast returns both an Observer and and Observable. The returned Observer is used to send items into the Multicast. The returned Observable is used to subscribe to the Multicast. The Multicast multicasts items send through the Observer to every Subscriber of the Observable.

size  size of the item buffer, number of items kept to replay to a new Subscriber.

Backpressure handling depends on the sign of the size argument. For positive size the multicast will block when one of the subscribers lets the buffer fill up. For negative size the multicast will drop items on the blocking subscriber, allowing the others to keep on receiving values. For hot observables dropping is preferred.

func Must added in v0.2.0

func Must[T any](t T, err error) T

func Subject

func Subject[T any](age time.Duration, capacity ...int) (Observer[T], Observable[T])

Subject returns both an Observer and and Observable. The returned Observer is used to send items into the Subject. The returned Observable is used to subscribe to the Subject. The Subject multicasts items send through the Observer to every Subscriber of the Observable.

age     max age to keep items in order to replay them to a new Subscriber (0 = no max age).
[size]  size of the item buffer, number of items kept to replay to a new Subscriber.
[cap]   capacity of the item buffer, number of items that can be observed before blocking.
[scap]  capacity of the subscription list, max number of simultaneous subscribers.

Types

type ConcurrentScheduler added in v0.2.0

type ConcurrentScheduler = scheduler.ConcurrentScheduler

type Connect added in v0.2.0

type Connect func(Scheduler, Subscriber)

Connect provides the Connect method for a Connectable[T].

func (Connect) Connect added in v0.2.0

func (connect Connect) Connect(schedulers ...Scheduler) Subscription

Connect instructs a Connectable[T] to subscribe to its source and begin emitting items to its subscribers. Connect accepts an optional scheduler argument.

type Connectable

type Connectable[T any] struct {
	Observable[T]
	Connect
}

Connectable[T] is an Observable[T] with a Connect method. So it has a both a Subscribe method and a Connect method. The Connect method is used to instruct the Connectable[T] to subscribe to its source. The Subscribe method is used to subscribe to the Connectable[T] itself.

func (Connectable[T]) AutoConnect added in v0.2.0

func (connectable Connectable[T]) AutoConnect(count int) Observable[T]

func (Connectable[T]) RefCount added in v0.2.0

func (connectable Connectable[T]) RefCount() Observable[T]

RefCount makes a Connectable[T] behave like an ordinary Observable[T]. On first Subscribe it will call Connect on its Connectable[T] and when its last subscriber is Unsubscribed it will cancel the source connection by calling Unsubscribe on the subscription returned by the call to Connect.

type Creator added in v0.2.0

type Creator[T any] func(index int) (Next T, Err error, Done bool)

type Error

type Error string

func (Error) Error added in v0.2.0

func (e Error) Error() string

type Observable

type Observable[T any] func(Observer[T], Scheduler, Subscriber)

func AsObservable added in v0.2.0

func AsObservable[T any](observable Observable[any]) Observable[T]

func BufferCount added in v0.2.0

func BufferCount[T any](observable Observable[T], bufferSize, startBufferEvery int) Observable[[]T]

func Combine added in v0.2.0

func Combine[T any](observables ...Observable[T]) Observable[[]T]

func CombineAll added in v0.2.0

func CombineAll[T any](observable Observable[Observable[T]]) Observable[[]T]

func CombinePair added in v0.2.0

func CombinePair[T, U any](first Observable[T], second Observable[U]) Observable[Pair[T, U]]

func CombineQuadruple added in v0.2.0

func CombineQuadruple[T, U, V, W any](first Observable[T], second Observable[U], third Observable[V], fourth Observable[W]) Observable[Quadruple[T, U, V, W]]

func CombineTriple added in v0.2.0

func CombineTriple[T, U, V any](first Observable[T], second Observable[U], third Observable[V]) Observable[Triple[T, U, V]]

func Concat

func Concat[T any](observables ...Observable[T]) Observable[T]

func ConcatAll added in v0.2.0

func ConcatAll[T any](observable Observable[Observable[T]]) Observable[T]

func ConcatMap added in v0.2.0

func ConcatMap[T, U any](observable Observable[T], project func(T) Observable[U]) Observable[U]

func Create

func Create[T any](create Creator[T]) Observable[T]

func Defer

func Defer[T any](factory func() Observable[T]) Observable[T]

func DistinctUntilChanged added in v0.2.0

func DistinctUntilChanged[T comparable](observable Observable[T]) Observable[T]

func Empty

func Empty[T any]() Observable[T]

func ExhaustAll added in v0.2.0

func ExhaustAll[T any](observable Observable[Observable[T]]) Observable[T]

func ExhaustMap added in v0.2.0

func ExhaustMap[T, U any](observable Observable[T], project func(T) Observable[U]) Observable[U]

func From

func From[T any](slice ...T) Observable[T]

func Interval

func Interval[T constraints.Integer | constraints.Float](interval time.Duration) Observable[T]

func Map added in v0.2.0

func Map[T, U any](observable Observable[T], project func(T) U) Observable[U]

func MapE added in v0.2.0

func MapE[T, U any](observable Observable[T], project func(T) (U, error)) Observable[U]

func Merge

func Merge[T any](observables ...Observable[T]) Observable[T]

func MergeAll added in v0.2.0

func MergeAll[T any](observable Observable[Observable[T]]) Observable[T]

func MergeMap added in v0.2.0

func MergeMap[T, U any](observable Observable[T], project func(T) Observable[U]) Observable[U]

func Never

func Never[T any]() Observable[T]

func Of

func Of[T any](value T) Observable[T]

func Race added in v0.2.0

func Race[T any](observables ...Observable[T]) Observable[T]

func Recv added in v0.2.0

func Recv[T any](ch <-chan T) Observable[T]

func Reduce added in v0.2.0

func Reduce[T, U any](observable Observable[T], seed U, accumulator func(acc U, next T) U) Observable[U]

func Scan added in v0.2.0

func Scan[T, U any](observable Observable[T], seed U, accumulator func(acc U, next T) U) Observable[U]

func ScanE added in v0.2.0

func ScanE[T, U any](observable Observable[T], seed U, accumulator func(acc U, next T) (U, error)) Observable[U]

func SwitchAll added in v0.2.0

func SwitchAll[T any](observable Observable[Observable[T]]) Observable[T]

func SwitchMap added in v0.2.0

func SwitchMap[T, U any](o Observable[T], project func(T) Observable[U]) Observable[U]

func Throw

func Throw[T any](err error) Observable[T]

func Ticker

func Ticker(initialDelay time.Duration, intervals ...time.Duration) Observable[time.Time]

Ticker creates an ObservableTime that emits a sequence of timestamps after an initialDelay has passed. Subsequent timestamps are emitted using a schedule of intervals passed in. If only the initialDelay is given, Ticker will emit only once.

func Timer

func Timer[T constraints.Integer | constraints.Float](initialDelay time.Duration, intervals ...time.Duration) Observable[T]

func WithLatestFrom added in v0.2.0

func WithLatestFrom[T any](observables ...Observable[T]) Observable[[]T]

func WithLatestFromAll added in v0.2.0

func WithLatestFromAll[T any](observable Observable[Observable[T]]) Observable[[]T]

func WithLatestFromPair added in v0.2.0

func WithLatestFromPair[T, U any](first Observable[T], second Observable[U]) Observable[Pair[T, U]]

func WithLatestFromQuadruple added in v0.2.0

func WithLatestFromQuadruple[T, U, V, W any](first Observable[T], second Observable[U], third Observable[V], fourth Observable[W]) Observable[Quadruple[T, U, V, W]]

func WithLatestFromTriple added in v0.2.0

func WithLatestFromTriple[T, U, V any](first Observable[T], second Observable[U], third Observable[V]) Observable[Triple[T, U, V]]

func (Observable[T]) All

func (observable Observable[T]) All(scheduler ...Scheduler) iter.Seq2[int, T]

func (Observable[T]) AsObservable

func (observable Observable[T]) AsObservable() Observable[any]

func (Observable[T]) Assign added in v0.2.0

func (observable Observable[T]) Assign(value *T) Observable[T]

func (Observable[T]) AutoUnsubscribe

func (observable Observable[T]) AutoUnsubscribe() Observable[T]

func (Observable[T]) Catch

func (observable Observable[T]) Catch(other Observable[T]) Observable[T]

func (Observable[T]) CatchError

func (observable Observable[T]) CatchError(selector func(err error, caught Observable[T]) Observable[T]) Observable[T]

func (Observable[T]) Collect added in v0.2.0

func (observable Observable[T]) Collect(slice *[]T) Observable[T]

func (Observable[T]) ConcatWith

func (observable Observable[T]) ConcatWith(others ...Observable[T]) Observable[T]

func (Observable[T]) Count

func (observable Observable[T]) Count() Observable[int]

func (Observable[T]) Delay

func (observable Observable[T]) Delay(duration time.Duration) Observable[T]

func (Observable[T]) DistinctUntilChanged

func (observable Observable[T]) DistinctUntilChanged(equal func(T, T) bool) Observable[T]

func (Observable[T]) Do

func (observable Observable[T]) Do(f func(next T)) Observable[T]

func (Observable[T]) ElementAt

func (observable Observable[T]) ElementAt(n int) Observable[T]

func (Observable[T]) Filter

func (observable Observable[T]) Filter(predicate func(T) bool) Observable[T]

func (Observable[T]) Finally

func (observable Observable[T]) Finally(f func(error)) Observable[T]

func (Observable[T]) Fprint added in v0.2.0

func (observable Observable[T]) Fprint(out io.Writer) Observable[T]

func (Observable[T]) Fprintf added in v0.2.0

func (observable Observable[T]) Fprintf(out io.Writer, format string) Observable[T]

func (Observable[T]) Fprintln added in v0.2.0

func (observable Observable[T]) Fprintln(out io.Writer) Observable[T]

func (Observable[T]) Go added in v0.2.0

func (observable Observable[T]) Go(schedulers ...Scheduler) Subscription

func (Observable[T]) Marshal added in v0.2.0

func (observable Observable[T]) Marshal(marshal func(any) ([]byte, error)) Observable[[]byte]

func (Observable[T]) MergeWith

func (observable Observable[T]) MergeWith(others ...Observable[T]) Observable[T]

func (Observable[T]) Passthrough added in v0.2.0

func (observable Observable[T]) Passthrough() Observable[T]

func (Observable[T]) Pipe added in v0.2.0

func (observable Observable[T]) Pipe(segments ...Pipe[T]) Observable[T]

func (Observable[T]) Print added in v0.2.0

func (observable Observable[T]) Print() Observable[T]

func (Observable[T]) Printf added in v0.2.0

func (observable Observable[T]) Printf(format string) Observable[T]

func (Observable[T]) Println

func (observable Observable[T]) Println() Observable[T]

func (Observable[T]) Publish

func (observable Observable[T]) Publish() Connectable[T]

func (Observable[T]) RaceWith added in v0.2.0

func (observable Observable[T]) RaceWith(others ...Observable[T]) Observable[T]

func (Observable[T]) Reduce

func (observable Observable[T]) Reduce(seed T, accumulator func(acc, next T) T) Observable[T]

func (Observable[T]) Repeat

func (observable Observable[T]) Repeat(count ...int) Observable[T]

Repeat creates an Observable that emits a sequence of items repeatedly. The count is the number of times to repeat the sequence. If count is not provided, the sequence is repeated indefinitely.

func (Observable[T]) Retry

func (observable Observable[T]) Retry(limit ...int) Observable[T]

func (Observable[T]) RetryTime added in v0.2.0

func (observable Observable[T]) RetryTime(backoff func(int) time.Duration, limit ...int) Observable[T]

func (Observable[T]) SampleTime

func (observable Observable[T]) SampleTime(window time.Duration) Observable[T]

SampleTime emits the most recent item emitted by an Observable within periodic time intervals.

func (Observable[T]) Send added in v0.2.0

func (observable Observable[T]) Send(ch chan<- T) Observable[T]

func (Observable[T]) Share added in v0.2.0

func (observable Observable[T]) Share() Observable[T]

Share returns a new Observable that multicasts (shares) the original Observable. As long as there is at least one Subscriber this Observable will be subscribed and emitting data. When all subscribers have unsubscribed it will unsubscribe from the source Observable. Because the Observable is multicasting it makes the stream hot.

This method is useful when you have an Observable that is expensive to create or has side-effects, but you want to share the results of that Observable with multiple subscribers. By using `Share`, you can avoid creating multiple instances of the Observable and ensure that all subscribers receive the same data.

func (Observable[T]) Skip

func (observable Observable[T]) Skip(n int) Observable[T]

func (Observable[T]) Slice added in v0.2.0

func (observable Observable[T]) Slice(schedulers ...Scheduler) (slice []T, err error)

func (Observable[T]) StartWith

func (observable Observable[T]) StartWith(values ...T) Observable[T]

func (Observable[T]) Subscribe

func (observable Observable[T]) Subscribe(observe Observer[T], scheduler Scheduler) Subscription

func (Observable[T]) SubscribeOn

func (observable Observable[T]) SubscribeOn(scheduler ConcurrentScheduler) Observable[T]

func (Observable[T]) Take

func (observable Observable[T]) Take(n int) Observable[T]

func (Observable[T]) TakeWhile

func (observable Observable[T]) TakeWhile(condition func(T) bool) Observable[T]

func (Observable[T]) Tap added in v0.2.0

func (observable Observable[T]) Tap(tap Observer[T]) Observable[T]

func (Observable[T]) Value added in v0.2.0

func (observable Observable[T]) Value(schedulers ...Scheduler) (value T, err error)

func (Observable[T]) Values added in v0.2.0

func (observable Observable[T]) Values(scheduler ...Scheduler) iter.Seq[T]

func (Observable[T]) Wait

func (observable Observable[T]) Wait(schedulers ...Scheduler) error

type Observer

type Observer[T any] func(next T, err error, done bool)

func AsObserver added in v0.2.0

func AsObserver[T any](observe Observer[any]) Observer[T]

func Ignore added in v0.2.0

func Ignore[T any]() Observer[T]

func (Observer[T]) AsObserver

func (observe Observer[T]) AsObserver() Observer[any]

func (Observer[T]) Complete

func (observe Observer[T]) Complete()

func (Observer[T]) Error

func (observe Observer[T]) Error(err error)

func (Observer[T]) Next

func (observe Observer[T]) Next(next T)

type Pair added in v0.2.0

type Pair[T, U any] struct {
	First  T
	Second U
}

type Pipe added in v0.2.0

type Pipe[T any] func(Observable[T]) Observable[T]

func Assign added in v0.2.0

func Assign[T any](value *T) Pipe[T]

func Catch added in v0.2.0

func Catch[T any](other Observable[T]) Pipe[T]

func CatchError added in v0.2.0

func CatchError[T any](selector func(err error, caught Observable[T]) Observable[T]) Pipe[T]

func Collect added in v0.2.0

func Collect[T any](slice *[]T) Pipe[T]

func Do added in v0.2.0

func Do[T any](f func(T)) Pipe[T]

func Filter added in v0.2.0

func Filter[T any](predicate func(T) bool) Pipe[T]

func Finally added in v0.2.0

func Finally[T any](f func(error)) Pipe[T]

func Fprint added in v0.2.0

func Fprint[T any](out io.Writer) Pipe[T]

func Fprintf added in v0.2.0

func Fprintf[T any](out io.Writer, format string) Pipe[T]

func Fprintln added in v0.2.0

func Fprintln[T any](out io.Writer) Pipe[T]

func Passthrough added in v0.2.0

func Passthrough[T any]() Pipe[T]

func Print added in v0.2.0

func Print[T any]() Pipe[T]

func Printf added in v0.2.0

func Printf[T any](format string) Pipe[T]

func Println

func Println[T any]() Pipe[T]

func Send added in v0.2.0

func Send[T any](ch chan<- T) Pipe[T]

func Skip added in v0.2.0

func Skip[T any](n int) Pipe[T]

func Take added in v0.2.0

func Take[T any](n int) Pipe[T]

func TakeWhile added in v0.2.0

func TakeWhile[T any](condition func(T) bool) Pipe[T]

func Tap added in v0.2.0

func Tap[T any](tap Observer[T]) Pipe[T]

type Quadruple added in v0.2.0

type Quadruple[T, U, V, W any] struct {
	First  T
	Second U
	Third  V
	Fourth W
}

type Scheduler

type Scheduler = scheduler.Scheduler

type Subscribable added in v0.2.0

type Subscribable interface {
	// Subscribed returns true when the Subscribable is currently active.
	Subscribed() bool

	// Unsubscribe will do nothing if it not an active. If it is still active
	// however, it will be changed to canceled. Subsequently, it will call
	// Unsubscribe down the subscriber tree on all children, along with all
	// methods added through OnUnsubscribe on those childern. On Unsubscribe
	// any call to the Wait method on a Subsvcription will return the error
	// ErrUnsubscribed.
	Unsubscribe()

	// Canceled returns true when the Subscribable has been canceled.
	Canceled() bool
}

Subscribable is the interface shared by Subscription and Subscriber.

type Subscriber

type Subscriber interface {
	// A Subscriber is a Subscribable
	Subscribable

	// Add will create and return a new child Subscriber setup in such a way that
	// calling Unsubscribe on the parent will also call Unsubscribe on the child.
	// Calling the Unsubscribe method on the child will NOT propagate to the
	// parent!
	Add() Subscriber

	// OnUnsubscribe will add the given callback function to the Subscriber.
	// The callback will be called when either the Unsubscribe of the parent
	// or of the Subscriber itself is called. If the subscription was already
	// canceled, then the callback function will just be called immediately.
	OnUnsubscribe(callback func())
}

Subscriber is a Subscribable that allows construction of a Subscriber tree.

type Subscription

type Subscription interface {
	Subscribable

	// Wait will by default block the calling goroutine and wait for the
	// Unsubscribe method to be called on this subscription.
	// However, when OnWait was called with a callback wait function it will
	// call that instead. Calling Wait on a subscription that has already been
	// canceled will return immediately. If the subscriber was canceled by
	// calling Unsubscribe, then the error returned is ErrUnsubscribed.
	// If the subscriber was terminated by calling Done, then the error
	// returned here is the one passed to Done.
	Wait() error
}

Subscription is an interface that allows code to monitor and control a subscription it received.

type Triple added in v0.2.0

type Triple[T, U, V any] struct {
	First  T
	Second U
	Third  V
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL