rx

package module
v0.2.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 21, 2025 License: MIT Imports: 10 Imported by: 0

README

rx

import "github.com/reactivego/rx"

Go Reference

Package rx provides Reactive Extensions, a powerful API for asynchronous programming in Go, built around observables and operators to process streams of data seamlessly.

Prerequisites

You’ll need Go 1.23 or later, as the implementation depends on language support for generics and iterators.

Observables

In rx, an Observables represents a stream of data that can emit items over time, while an observer subscribes to it to receive and react to those emissions. This reactive approach enables asynchronous and concurrent operations without blocking execution. Instead of waiting for values to become available, an observer passively listens and responds whenever the Observable emits data, errors, or a completion signal.

This page introduces the reactive pattern, explaining what Observables and observers are and how subscriptions work. Other sections explore the powerful set of Observable operators that allow you to transform, combine, and control data streams efficiently.

An Observable:

  • is a stream of events.
  • assumes zero to many values over time.
  • pushes values
  • can take any amount of time to complete (or may never)
  • is cancellable
  • is lazy (it doesn't do anything until you subscribe).

Example

package main

import "github.com/reactivego/x"

func main() {
    x.From[any](1,"hi",2.3).Println()
}

Note the program creates a mixed type any observable from an int, string and a float64.

Output

1
hi
2.3

Example

package main

import "github.com/reactivego/rx"

func main() {
    rx.From(1,2,3).Println()
}

Note the program uses inferred type int for the observable.

Output

1
2
3

Observables in x are somewhat similar to Go channels but have much richer semantics:

Observables can be hot or cold. A hot observable will try to emit values even when nobody is subscribed. Values emitted during that period will be lost. The position of a mouse pointer or the current time are examples of hot observables.

A cold observable will only start emitting values after somebody subscribes. The contents of a file or a database are examples of cold observables.

An observable can complete normally or with an error, it uses subscriptions that can be canceled from the subscriber side. Where a normal variable is just a place where you read and write values from, an observable captures how the value of this variable changes over time.

Concurrency flows naturally from the fact that an observable is an ever changing stream of values. Every Observable conceptually has at its core a concurrently running process that pushes out values.

Operators

Operators form a language in which programs featuring Observables can be expressed. They work on one or more Observables to transform, filter and combine them into new Observables.

Index
  • All determines whether all items emitted by an Observable meet some criteria.
  • AsObservable when called on an Observable source will type assert the 'any' items of the source to 'bar' items.
  • AsyncSubject emits the last value (and only the last value) emitted by the Observable part, and only after that Observable part completes.
  • AuditTime waits until the source emits and then starts a timer.
  • AutoConnect makes a Multicaster behave like an ordinary Observable that automatically connects the multicaster to its source when the specified number of observers have subscribed to it.
  • Average calculates the average of numbers emitted by an Observable and emits this average.
  • BehaviorSubject returns a new BehaviorSubject.
  • Buffer buffers the source Observable values until closingNotifier emits.
  • BufferTime buffers the source Observable values for a specific time period and emits those as a slice periodically in time.
  • Catch recovers from an error notification by continuing the sequence without emitting the error but by switching to the catch ObservableInt to provide items.
  • CatchError catches errors on the Observable to be handled by returning a new Observable or throwing an error.
  • CombineLatest will subscribe to all Observables.
  • CombineLatestAll flattens a higher order observable.
  • CombineLatestMap maps every entry emitted by the Observable into an Observable, and then subscribe to it, until the source observable completes.
  • CombineLatestMapTo maps every entry emitted by the Observable into a single Observable, and then subscribe to it, until the source observable completes.
  • CombineLatestWith will subscribe to its Observable and all other Observables passed in.
  • Concat emits the emissions from two or more observables without interleaving them.
  • ConcatAll flattens a higher order observable by concattenating the observables it emits.
  • ConcatMap transforms the items emitted by an Observable by applying a function to each item and returning an Observable.
  • ConcatMapTo maps every entry emitted by the Observable into a single Observable.
  • ConcatWith emits the emissions from two or more observables without interleaving them.
  • Connect instructs a connectable Observable to begin emitting items to its subscribers.
  • Count counts the number of items emitted by the source ObservableInt and emits only this value.
  • Create provides a way of creating an Observable from scratch by calling observer methods programmatically.
  • CreateFutureRecursive provides a way of creating an Observable from scratch by calling observer methods programmatically.
  • CreateRecursive provides a way of creating an Observable from scratch by calling observer methods programmatically.
  • DebounceTime only emits the last item of a burst from an Observable if a particular timespan has passed without it emitting another item.
  • Defer does not create the Observable until the observer subscribes.
  • Delay shifts the emission from an Observable forward in time by a particular amount of time.
  • Distinct suppress duplicate items emitted by an Observable.
  • DistinctUntilChanged only emits when the current value is different from the last.
  • Do calls a function for each next value passing through the observable.
  • DoOnComplete calls a function when the stream completes.
  • DoOnError calls a function for any error on the stream.
  • ElementAt emit only item n emitted by an Observable.
  • Empty creates an Observable that emits no items but terminates normally.
  • Filter emits only those items from an observable that pass a predicate test.
  • Finally applies a function for any error or completion on the stream.
  • First emits only the first item, or the first item that meets a condition, from an Observable.
  • From creates an observable from multiple values passed in.
  • FromChan creates an Observable from a Go channel.
  • IgnoreCompletion only emits items and never completes, neither with Error nor with Complete.
  • IgnoreElements does not emit any items from an Observable but mirrors its termination notification.
  • Interval creates an ObservableInt that emits a sequence of integers spaced by a particular time interval.
  • Just creates an observable that emits a particular item.
  • Last emits only the last item emitted by an Observable.
  • Map transforms the items emitted by an Observable by applying a function to each item.
  • MapTo transforms the items emitted by an Observable.
  • Max determines, and emits, the maximum-valued item emitted by an Observable.
  • Merge combines multiple Observables into one by merging their emissions.
  • MergeAll flattens a higher order observable by merging the observables it emits.
  • MergeDelayError combines multiple Observables into one by merging their emissions.
  • MergeDelayErrorWith combines multiple Observables into one by merging their emissions.
  • MergeMap transforms the items emitted by an Observable by applying a function to each item an returning an Observable.
  • MergeMapTo maps every entry emitted by the Observable into a single Observable.
  • MergeWith combines multiple Observables into one by merging their emissions.
  • Min determines, and emits, the minimum-valued item emitted by an Observable.
  • Never creates an Observable that emits no items and does't terminate.
  • ObserveOn specifies a schedule function to use for delivering values to the observer.
  • ObserverObservable actually is an observer that is made observable.
  • Of emits a variable amount of values in a sequence and then emits a complete notification.
  • Only filters the value stream of an observable and lets only the values of a specific type pass.
  • Passthrough just passes through all output from the Observable.
  • Println subscribes to the Observable and prints every item to os.Stdout while it waits for completion or error.
  • Publish returns a Multicaster for a Subject to an underlying Observable and turns the subject into a connnectable observable.
  • PublishBehavior returns a Multicaster that shares a single subscription to the underlying Observable returning an initial value or the last value emitted by the underlying Observable.
  • PublishLast returns a Multicaster that shares a single subscription to the underlying Observable containing only the last value emitted before it completes.
  • PublishReplay returns a Multicaster for a ReplaySubject to an underlying Observable and turns the subject into a connectable observable.
  • Range creates an Observable that emits a range of sequential int values.
  • Reduce applies a reducer function to each item emitted by an Observable and the previous reducer result.
  • RefCount makes a Connectable behave like an ordinary Observable.
  • Repeat creates an observable that emits a sequence of items repeatedly.
  • ReplaySubject ensures that all observers see the same sequence of emitted items, even if they subscribe after.
  • Retry if a source Observable sends an error notification, resubscribe to it in the hopes that it will complete without error.
  • SampleTime emits the most recent item emitted by an Observable within periodic time intervals.
  • Scan applies a accumulator function to each item emitted by an Observable and the previous accumulator result.
  • Serialize forces an observable to make serialized calls and to be well-behaved.
  • Single enforces that the observable sends exactly one data item and then completes.
  • Skip suppresses the first n items emitted by an Observable.
  • SkipLast suppresses the last n items emitted by an Observable.
  • Start creates an Observable that emits the return value of a function.
  • StartWith returns an observable that, at the moment of subscription, will synchronously emit all values provided to this operator, then subscribe to the source and mirror all of its emissions to subscribers.
  • Subject is a combination of an observer and observable.
  • Subscribe operates upon the emissions and notifications from an Observable.
  • SubscribeOn specifies the scheduler an Observable should use when it is subscribed to.
  • Sum calculates the sum of numbers emitted by an Observable and emits this sum.
  • SwitchAll converts an Observable that emits Observables into a single Observable that emits the items emitted by the most-recently-emitted of those Observables.
  • SwitchMap transforms the items emitted by an Observable by applying a function to each item an returning an Observable.
  • Take emits only the first n items emitted by an Observable.
  • TakeLast emits only the last n items emitted by an Observable.
  • TakeUntil emits items emitted by an Observable until another Observable emits an item.
  • TakeWhile mirrors items emitted by an Observable until a specified condition becomes false.
  • ThrottleTime emits when the source emits and then starts a timer during which all emissions from the source are ignored.
  • Throw creates an observable that emits no items and terminates with an error.
  • Ticker creates an ObservableTime that emits a sequence of timestamps after an initialDelay has passed.
  • TimeInterval intercepts the items from the source Observable and emits in their place a struct that indicates the amount of time that elapsed between pairs of emissions.
  • Timeout mirrors the source Observable, but issue an error notification if a particular period of time elapses without any emitted items.
  • Timer creates an Observable that emits a sequence of integers (starting at zero) after an initialDelay has passed.
  • Timestamp attaches a timestamp to each item emitted by an observable indicating when it was emitted.
  • ToChan returns a channel that emits 'any' values.
  • ToSingle blocks until the Observable emits exactly one value or an error.
  • ToSlice collects all values from the Observable into an slice.
  • Wait subscribes to the Observable and waits for completion or error.
  • WithLatestFrom will subscribe to all Observables and wait for all of them to emit before emitting the first slice.
  • WithLatestFromAll flattens a higher order observable.

Documentation

Overview

Package rx provides Reactive Extensions, a powerful API for asynchronous programming in Go, built around observables and operators to process streams of data seamlessly.

Example (All)
package main

import (
	"fmt"

	"github.com/reactivego/rx"
)

func main() {
	source := rx.From("ZERO", "ONE", "TWO")

	for k, v := range source.All() {
		fmt.Println(k, v)
	}

	fmt.Println("OK")
}
Output:

0 ZERO
1 ONE
2 TWO
OK
Example (BufferCount)
package main

import (
	"fmt"

	"github.com/reactivego/rx"
)

func main() {
	source := rx.From(0, 1, 2, 3)

	fmt.Println("BufferCount(From(0, 1, 2, 3), 2, 1)")
	rx.BufferCount(source, 2, 1).Println().Wait()

	fmt.Println("BufferCount(From(0, 1, 2, 3), 2, 2)")
	rx.BufferCount(source, 2, 2).Println().Wait()

	fmt.Println("BufferCount(From(0, 1, 2, 3), 2, 3)")
	rx.BufferCount(source, 2, 3).Println().Wait()

	fmt.Println("BufferCount(From(0, 1, 2, 3), 3, 2)")
	rx.BufferCount(source, 3, 2).Println().Wait()

	fmt.Println("BufferCount(From(0, 1, 2, 3), 6, 6)")
	rx.BufferCount(source, 6, 6).Println().Wait()

	fmt.Println("BufferCount(From(0, 1, 2, 3), 2, 0)")
	rx.BufferCount(source, 2, 0).Println().Wait()
}
Output:

BufferCount(From(0, 1, 2, 3), 2, 1)
[0 1]
[1 2]
[2 3]
[3]
BufferCount(From(0, 1, 2, 3), 2, 2)
[0 1]
[2 3]
BufferCount(From(0, 1, 2, 3), 2, 3)
[0 1]
[3]
BufferCount(From(0, 1, 2, 3), 3, 2)
[0 1 2]
[2 3]
BufferCount(From(0, 1, 2, 3), 6, 6)
[0 1 2 3]
BufferCount(From(0, 1, 2, 3), 2, 0)
[0 1]
Example (ConcatAll)
package main

import (
	"fmt"
	"time"

	"github.com/reactivego/rx"
)

func main() {
	source := rx.Empty[rx.Observable[string]]()
	rx.ConcatAll(source).Wait()

	source = rx.Of(rx.Empty[string]())
	rx.ConcatAll(source).Wait()

	req := func(request string, duration time.Duration) rx.Observable[string] {
		req := rx.From(request + " response")
		if duration == 0 {
			return req
		}
		return req.Delay(duration)
	}

	const ms = time.Millisecond

	req1 := req("first", 10*ms)
	req2 := req("second", 20*ms)
	req3 := req("third", 0*ms)
	req4 := req("fourth", 60*ms)

	source = rx.From(req1).ConcatWith(rx.From(req2, req3, req4).Delay(100 * ms))
	rx.ConcatAll(source).Println().Wait()

	fmt.Println("OK")
}
Output:

first response
second response
third response
fourth response
OK
Example (Count)
package main

import (
	"fmt"

	"github.com/reactivego/rx"
)

func main() {
	source := rx.From(1, 2, 3, 4, 5)

	count := source.Count()
	count.Println().Wait()

	emptySource := rx.Empty[int]()
	emptyCount := emptySource.Count()
	emptyCount.Println().Wait()

	fmt.Println("OK")
}
Output:

5
0
OK
Example (ElementAt)
package main

import (
	"github.com/reactivego/rx"
)

func main() {
	rx.From(0, 1, 2, 3, 4).ElementAt(2).Println().Wait()
}
Output:

2
Example (ExhaustAll)
package main

import (
	"fmt"
	"strconv"
	"time"

	"github.com/reactivego/rx"
)

func main() {
	const ms = time.Millisecond

	stream := func(name string, duration time.Duration, count int) rx.Observable[string] {
		return rx.Map(rx.Timer[int](0*ms, duration), func(next int) string {
			return name + "-" + strconv.Itoa(next)
		}).Take(count)
	}

	streams := []rx.Observable[string]{
		stream("a", 20*ms, 3),
		stream("b", 20*ms, 3),
		stream("c", 20*ms, 3),
		rx.Empty[string](),
	}

	streamofstreams := rx.Map(rx.Timer[int](20*ms, 30*ms, 250*ms, 100*ms).Take(4), func(next int) rx.Observable[string] {
		return streams[next]
	})

	err := rx.ExhaustAll(streamofstreams).Println().Wait()

	if err == nil {
		fmt.Println("success")
	}
}
Output:

a-0
a-1
a-2
c-0
c-1
c-2
success
Example (Marshal)
package main

import (
	"encoding/json"

	"github.com/reactivego/rx"
)

func main() {
	type R struct {
		A string `json:"a"`
		B string `json:"b"`
	}

	b2s := func(data []byte) string { return string(data) }

	rx.Map(rx.Of(R{"Hello", "World"}).Marshal(json.Marshal), b2s).Println().Wait()
}
Output:

{"a":"Hello","b":"World"}
Example (Multicast)
package main

import (
	"fmt"

	"github.com/reactivego/rx"
)

func main() {
	serial := rx.NewScheduler()

	in, out := rx.Multicast[int](1)

	// Ignore everything before any subscriptions, including the last!
	in.Next(-2)
	in.Next(-1)
	in.Next(0)
	in.Next(1)

	// Schedule the subsequent emits in a loop. This will be the first task to
	// run on the serial scheduler after the subscriptions have been added.
	serial.ScheduleLoop(2, func(index int, again func(next int)) {
		if index < 4 {
			in.Next(index)
			again(index + 1)
		} else {
			in.Done(rx.Error("foo"))
		}
	})

	// Add a couple of subscriptions
	sub1 := out.Println().Go(serial)
	sub2 := out.Println().Go(serial)

	// Let the scheduler run and wait for all of its scheduled tasks to finish.
	serial.Wait()
	fmt.Println(sub1.Wait())
	fmt.Println(sub2.Wait())
}
Output:

2
2
3
3
foo
foo
Example (MulticastDrop)
package main

import (
	"fmt"

	"github.com/reactivego/rx"
)

func main() {
	serial := rx.NewScheduler()

	const onBackpressureDrop = -1

	// multicast with backpressure handling set to dropping incoming
	// items that don't fit in the buffer once it has filled up.
	in, out := rx.Multicast[int](1 * onBackpressureDrop)

	// ignore everything before any subscriptions, including the last!
	in.Next(-2)
	in.Next(-1)
	in.Next(0)
	in.Next(1)

	// add a couple of subscriptions
	sub1 := out.Println().Go(serial)
	sub2 := out.Println().Go(serial)

	in.Next(2)               // accepted: buffer not full
	in.Next(3)               // dropped: buffer full
	in.Done(rx.Error("foo")) // dropped: buffer full

	serial.Wait()
	fmt.Println(sub1.Wait())
	fmt.Println(sub2.Wait())
}
Output:

2
2
<nil>
<nil>
Example (Race)
package main

import (
	"fmt"
	"time"

	"github.com/reactivego/rx"
)

func main() {
	const ms = time.Millisecond

	req := func(request string, duration time.Duration) rx.Observable[string] {
		return rx.From(request + " response").Delay(duration)
	}

	req1 := req("first", 50*ms)
	req2 := req("second", 10*ms)
	req3 := req("third", 60*ms)

	rx.Race(req1, req2, req3).Println().Wait()

	err := func(text string, duration time.Duration) rx.Observable[int] {
		return rx.Throw[int](rx.Error(text + " error")).Delay(duration)
	}

	err1 := err("first", 10*ms)
	err2 := err("second", 20*ms)
	err3 := err("third", 30*ms)

	fmt.Println(rx.Race(err1, err2, err3).Wait(rx.Goroutine))
}
Output:

second response
first error
Example (Retry)
package main

import (
	"fmt"

	"github.com/reactivego/rx"
)

func main() {
	var first error = rx.Error("error")
	a := rx.Create(func(index int) (next int, err error, done bool) {
		if index < 3 {
			return index, nil, false
		}
		err, first = first, nil
		return 0, err, true
	})
	err := a.Retry().Println().Wait()
	fmt.Println(first == nil)
	fmt.Println(err)
}
Output:

0
1
2
0
1
2
true
<nil>
Example (Share)
package main

import (
	"github.com/reactivego/rx"
)

func main() {
	serial := rx.NewScheduler()

	shared := rx.From(1, 2, 3).Share()

	shared.Println().Go(serial)
	shared.Println().Go(serial)
	shared.Println().Go(serial)

	serial.Wait()
}
Output:

1
1
1
2
2
2
3
3
3
Example (Skip)
package main

import (
	"github.com/reactivego/rx"
)

func main() {
	rx.From(1, 2, 3, 4, 5).Skip(2).Println().Wait()
}
Output:

3
4
5
Example (Subject)
package main

import (
	"fmt"

	"github.com/reactivego/rx"
)

func main() {
	serial := rx.NewScheduler()

	// subject collects emits when there are no subscriptions active.
	in, out := rx.Subject[int](0, 1)

	// ignore everything before any subscriptions, except the last because buffer size is 1
	in.Next(-2)
	in.Next(-1)
	in.Next(0)
	in.Next(1)

	// add a couple of subscriptions
	sub1 := out.Println().Go(serial)
	sub2 := out.Println().Go(serial)

	// schedule the subsequent emits on the serial scheduler otherwise these calls
	// will block because the buffer is full.
	// subject will detect usage of scheduler on observable side and use it on the
	// observer side to keep the data flow through the subject going.
	serial.Schedule(func() {
		in.Next(2)
		in.Next(3)
		in.Done(rx.Error("foo"))
	})

	serial.Wait()
	fmt.Println(sub1.Wait())
	fmt.Println(sub2.Wait())
}
Output:

1
1
2
2
3
3
foo
foo
Example (SwitchAll)
package main

import (
	"fmt"
	"time"

	"github.com/reactivego/rx"
)

func main() {
	const ms = time.Millisecond

	interval42x4 := rx.Interval[int](42 * ms).Take(4)
	interval16x4 := rx.Interval[int](16 * ms).Take(4)

	err := rx.SwitchAll(rx.Map(interval42x4, func(next int) rx.Observable[int] { return interval16x4 })).Println().Wait(rx.Goroutine)

	if err == nil {
		fmt.Println("success")
	}
}
Output:

0
1
0
1
0
1
0
1
2
3
success
Example (SwitchMap)
package main

import (
	"fmt"
	"time"

	"github.com/reactivego/rx"
)

func main() {
	const ms = time.Millisecond

	webreq := func(request string, duration time.Duration) rx.Observable[string] {
		return rx.From(request + " result").Delay(duration)
	}

	first := webreq("first", 50*ms)
	second := webreq("second", 10*ms)
	latest := webreq("latest", 50*ms)

	switchmap := rx.SwitchMap(rx.Interval[int](20*ms).Take(3), func(i int) rx.Observable[string] {
		switch i {
		case 0:
			return first
		case 1:
			return second
		case 2:
			return latest
		default:
			return rx.Empty[string]()
		}
	})

	err := switchmap.Println().Wait()
	if err == nil {
		fmt.Println("success")
	}
}
Output:

second result
latest result
success
Example (Values)
package main

import (
	"fmt"

	"github.com/reactivego/rx"
	"github.com/reactivego/scheduler"
)

func main() {
	source := rx.From(1, 3, 5)

	// Why choose the Goroutine concurrent scheduler?
	// An observable can actually be at the root of a tree
	// of separately running observables that have their
	// responses merged. The Goroutine scheduler allows
	// these observables to run concurrently.

	// run the observable on 1 or more goroutines
	for i := range source.Values(scheduler.Goroutine) {
		// This is called from a newly created goroutine
		fmt.Println(i)
	}

	// run the observable on the current goroutine
	for i := range source.Values(scheduler.New()) {
		fmt.Println(i)
	}

	fmt.Println("OK")
}
Output:

1
3
5
1
3
5
OK

Index

Examples

Constants

View Source
const InvalidCount = Error("invalid count")
View Source
const OutOfSubjectSubscriptions = Error("out of subject subscriptions")
View Source
const RepeatCountInvalid = Error("repeat count invalid")
View Source
const SubscriptionActive = Error("subscription active")

SubscriptionActive is the error returned by Err() when the subscription is still active and has not yet completed or been canceled.

View Source
const SubscriptionCanceled = Error("subscription canceled")

SubscriptionCanceled is the error returned by Wait() and Err() when the subscription was canceled by calling Unsubscribe() on the Subscription. This indicates the subscription was terminated by the subscriber rather than by the observable completing normally or with an error.

View Source
const TypecastFailed = Error("typecast failed")
View Source
const ZipBufferOverflow = Error("zip buffer overflow")

Variables

View Source
var Goroutine = scheduler.Goroutine
View Source
var NewScheduler = scheduler.New

Functions

func All2 added in v0.2.2

func All2[T, U any](observable Observable[Tuple2[T, U]], scheduler ...Scheduler) iter.Seq2[T, U]

func Equal added in v0.2.0

func Equal[T comparable]() func(T, T) bool

func Multicast added in v0.2.0

func Multicast[T any](size int) (Observer[T], Observable[T])

Multicast returns both an Observer and and Observable. The returned Observer is used to send items into the Multicast. The returned Observable is used to subscribe to the Multicast. The Multicast multicasts items send through the Observer to every Subscriber of the Observable.

size  size of the item buffer, number of items kept to replay to a new Subscriber.

Backpressure handling depends on the sign of the size argument. For positive size the multicast will block when one of the subscribers lets the buffer fill up. For negative size the multicast will drop items on the blocking subscriber, allowing the others to keep on receiving values. For hot observables dropping is preferred.

func Must added in v0.2.0

func Must[T any](t T, err error) T

func Subject

func Subject[T any](age time.Duration, capacity ...int) (Observer[T], Observable[T])

Subject returns both an Observer and and Observable. The returned Observer is used to send items into the Subject. The returned Observable is used to subscribe to the Subject. The Subject multicasts items send through the Observer to every Subscriber of the Observable.

age     max age to keep items in order to replay them to a new Subscriber (0 = no max age).
[size]  size of the item buffer, number of items kept to replay to a new Subscriber.
[cap]   capacity of the item buffer, number of items that can be observed before blocking.
[scap]  capacity of the subscription list, max number of simultaneous subscribers.

Types

type ConcurrentScheduler added in v0.2.0

type ConcurrentScheduler = scheduler.ConcurrentScheduler

type Connectable

type Connectable[T any] struct {
	Observable[T]
	Connector
}

Connectable[T] is an Observable[T] with a Connect method. So it has a both a Subscribe method and a Connect method. The Connect method is used to instruct the Connectable[T] to subscribe to its source. The Subscribe method is used to subscribe to the Connectable[T] itself.

func (Connectable[T]) AutoConnect added in v0.2.0

func (connectable Connectable[T]) AutoConnect(count int) Observable[T]

func (Connectable[T]) RefCount added in v0.2.0

func (connectable Connectable[T]) RefCount() Observable[T]

RefCount makes a Connectable[T] behave like an ordinary Observable[T]. On first Subscribe it will call Connect on its Connectable[T] and when its last subscriber is Unsubscribed it will cancel the source connection by calling Unsubscribe on the subscription returned by the call to Connect.

type Connector added in v0.2.3

type Connector func(Scheduler, Subscriber)

Connector provides the Connect method for a Connectable[T].

func (Connector) Connect added in v0.2.3

func (connect Connector) Connect(schedulers ...Scheduler) Subscription

Connect instructs a Connectable[T] to subscribe to its source and begin emitting items to its subscribers. Connect accepts an optional scheduler argument.

type Creator added in v0.2.0

type Creator[T any] func(index int) (Next T, Err error, Done bool)

type Error

type Error string

func (Error) Error added in v0.2.0

func (e Error) Error() string

type Float added in v0.2.2

type Float interface {
	~float32 | ~float64
}

Float is a constraint that permits any floating-point type. If future releases of Go add new predeclared floating-point types, this constraint will be modified to include them.

type Integer added in v0.2.2

type Integer interface {
	Signed | Unsigned
}

Integer is a constraint that permits any integer type. If future releases of Go add new predeclared integer types, this constraint will be modified to include them.

type MaxBufferSizeOption added in v0.2.1

type MaxBufferSizeOption = func(*int)

MaxBufferSizeOption is a function type used for configuring the maximum buffer size of an observable stream.

func WithMaxBufferSize added in v0.2.1

func WithMaxBufferSize(n int) MaxBufferSizeOption

WithMaxBufferSize creates a MaxBufferSizeOption that sets the maximum buffer size to n. This option is typically used when creating new observables to control memory usage.

type Observable

type Observable[T any] func(Observer[T], Scheduler, Subscriber)

func AsObservable added in v0.2.0

func AsObservable[T any](observable Observable[any]) Observable[T]

func BufferCount added in v0.2.0

func BufferCount[T any](observable Observable[T], bufferSize, startBufferEvery int) Observable[[]T]

func CombineAll added in v0.2.0

func CombineAll[T any](observable Observable[Observable[T]]) Observable[[]T]

func CombineLatest

func CombineLatest[T any](observables ...Observable[T]) Observable[[]T]

func CombineLatest2 added in v0.2.1

func CombineLatest2[T, U any](first Observable[T], second Observable[U]) Observable[Tuple2[T, U]]

func CombineLatest3 added in v0.2.1

func CombineLatest3[T, U, V any](first Observable[T], second Observable[U], third Observable[V]) Observable[Tuple3[T, U, V]]

func CombineLatest4 added in v0.2.1

func CombineLatest4[T, U, V, W any](first Observable[T], second Observable[U], third Observable[V], fourth Observable[W]) Observable[Tuple4[T, U, V, W]]

func CombineLatest5 added in v0.2.1

func CombineLatest5[T, U, V, W, X any](first Observable[T], second Observable[U], third Observable[V], fourth Observable[W], fifth Observable[X]) Observable[Tuple5[T, U, V, W, X]]

func Concat

func Concat[T any](observables ...Observable[T]) Observable[T]

func ConcatAll added in v0.2.0

func ConcatAll[T any](observable Observable[Observable[T]]) Observable[T]

func ConcatMap added in v0.2.0

func ConcatMap[T, U any](observable Observable[T], project func(T) Observable[U]) Observable[U]

func Create

func Create[T any](create Creator[T]) Observable[T]

func Defer

func Defer[T any](factory func() Observable[T]) Observable[T]

func Empty

func Empty[T any]() Observable[T]

func ExhaustAll added in v0.2.0

func ExhaustAll[T any](observable Observable[Observable[T]]) Observable[T]

func ExhaustMap added in v0.2.0

func ExhaustMap[T, U any](observable Observable[T], project func(T) Observable[U]) Observable[U]

func From

func From[T any](slice ...T) Observable[T]

func Interval

func Interval[T Integer | Float](interval time.Duration) Observable[T]

func Map added in v0.2.0

func Map[T, U any](observable Observable[T], project func(T) U) Observable[U]

func MapE added in v0.2.0

func MapE[T, U any](observable Observable[T], project func(T) (U, error)) Observable[U]

func Merge

func Merge[T any](observables ...Observable[T]) Observable[T]

func MergeAll added in v0.2.0

func MergeAll[T any](observable Observable[Observable[T]]) Observable[T]

func MergeMap added in v0.2.0

func MergeMap[T, U any](observable Observable[T], project func(T) Observable[U]) Observable[U]

func Never

func Never[T any]() Observable[T]

func Of

func Of[T any](value T) Observable[T]

func Pull added in v0.2.2

func Pull[T any](seq iter.Seq[T]) Observable[T]

func Pull2 added in v0.2.2

func Pull2[T, U any](seq iter.Seq2[T, U]) Observable[Tuple2[T, U]]

func Race added in v0.2.0

func Race[T any](observables ...Observable[T]) Observable[T]

func Recv added in v0.2.0

func Recv[T any](ch <-chan T) Observable[T]

func Reduce added in v0.2.0

func Reduce[T, U any](observable Observable[T], seed U, accumulator func(acc U, next T) U) Observable[U]

func ReduceE added in v0.2.1

func ReduceE[T, U any](observable Observable[T], seed U, accumulator func(acc U, next T) (U, error)) Observable[U]

func Scan added in v0.2.0

func Scan[T, U any](observable Observable[T], seed U, accumulator func(acc U, next T) U) Observable[U]

func ScanE added in v0.2.0

func ScanE[T, U any](observable Observable[T], seed U, accumulator func(acc U, next T) (U, error)) Observable[U]

func SwitchAll added in v0.2.0

func SwitchAll[T any](observable Observable[Observable[T]]) Observable[T]

func SwitchMap added in v0.2.0

func SwitchMap[T, U any](o Observable[T], project func(T) Observable[U]) Observable[U]

func Throw

func Throw[T any](err error) Observable[T]

func Ticker

func Ticker(initialDelay time.Duration, intervals ...time.Duration) Observable[time.Time]

Ticker creates an ObservableTime that emits a sequence of timestamps after an initialDelay has passed. Subsequent timestamps are emitted using a schedule of intervals passed in. If only the initialDelay is given, Ticker will emit only once.

func Timer

func Timer[T Integer | Float](initialDelay time.Duration, intervals ...time.Duration) Observable[T]

func WithLatestFrom added in v0.2.0

func WithLatestFrom[T any](observables ...Observable[T]) Observable[[]T]

func WithLatestFrom2 added in v0.2.1

func WithLatestFrom2[T, U any](first Observable[T], second Observable[U]) Observable[Tuple2[T, U]]

func WithLatestFrom3 added in v0.2.1

func WithLatestFrom3[T, U, V any](first Observable[T], second Observable[U], third Observable[V]) Observable[Tuple3[T, U, V]]

func WithLatestFrom4 added in v0.2.1

func WithLatestFrom4[T, U, V, W any](first Observable[T], second Observable[U], third Observable[V], fourth Observable[W]) Observable[Tuple4[T, U, V, W]]

func WithLatestFrom5 added in v0.2.1

func WithLatestFrom5[T, U, V, W, X any](first Observable[T], second Observable[U], third Observable[V], fourth Observable[W], fifth Observable[X]) Observable[Tuple5[T, U, V, W, X]]

func WithLatestFromAll added in v0.2.0

func WithLatestFromAll[T any](observable Observable[Observable[T]]) Observable[[]T]

func Zip added in v0.2.1

func Zip[T any](observables ...Observable[T]) Observable[[]T]

func Zip2 added in v0.2.1

func Zip2[T, U any](first Observable[T], second Observable[U], options ...MaxBufferSizeOption) Observable[Tuple2[T, U]]

func Zip3 added in v0.2.1

func Zip3[T, U, V any](first Observable[T], second Observable[U], third Observable[V], options ...MaxBufferSizeOption) Observable[Tuple3[T, U, V]]

func Zip4 added in v0.2.1

func Zip4[T, U, V, W any](first Observable[T], second Observable[U], third Observable[V], fourth Observable[W], options ...MaxBufferSizeOption) Observable[Tuple4[T, U, V, W]]

func Zip5 added in v0.2.1

func Zip5[T, U, V, W, X any](first Observable[T], second Observable[U], third Observable[V], fourth Observable[W], fifth Observable[X], options ...MaxBufferSizeOption) Observable[Tuple5[T, U, V, W, X]]

func ZipAll added in v0.2.1

func ZipAll[T any](observable Observable[Observable[T]], options ...MaxBufferSizeOption) Observable[[]T]

func (Observable[T]) All

func (observable Observable[T]) All(scheduler ...Scheduler) iter.Seq2[int, T]

func (Observable[T]) Append added in v0.2.1

func (observable Observable[T]) Append(slice *[]T) Observable[T]

func (Observable[T]) AsObservable

func (observable Observable[T]) AsObservable() Observable[any]

func (Observable[T]) Assign added in v0.2.0

func (observable Observable[T]) Assign(value *T) Observable[T]

func (Observable[T]) AutoUnsubscribe

func (observable Observable[T]) AutoUnsubscribe() Observable[T]

func (Observable[T]) Catch

func (observable Observable[T]) Catch(other Observable[T]) Observable[T]

func (Observable[T]) CatchError

func (observable Observable[T]) CatchError(selector func(err error, caught Observable[T]) Observable[T]) Observable[T]

func (Observable[T]) ConcatWith

func (observable Observable[T]) ConcatWith(others ...Observable[T]) Observable[T]

func (Observable[T]) Count

func (observable Observable[T]) Count() Observable[int]

func (Observable[T]) Delay

func (observable Observable[T]) Delay(duration time.Duration) Observable[T]

func (Observable[T]) DistinctUntilChanged

func (observable Observable[T]) DistinctUntilChanged(equal func(T, T) bool) Observable[T]

func (Observable[T]) Do

func (observable Observable[T]) Do(f func(T)) Observable[T]

func (Observable[T]) ElementAt

func (observable Observable[T]) ElementAt(n int) Observable[T]

func (Observable[T]) EndWith added in v0.2.2

func (observable Observable[T]) EndWith(values ...T) Observable[T]

func (Observable[T]) Filter

func (observable Observable[T]) Filter(predicate func(T) bool) Observable[T]

func (Observable[T]) First

func (observable Observable[T]) First(schedulers ...Scheduler) (value T, err error)

func (Observable[T]) Fprint added in v0.2.0

func (observable Observable[T]) Fprint(out io.Writer) Observable[T]

func (Observable[T]) Fprintf added in v0.2.0

func (observable Observable[T]) Fprintf(out io.Writer, format string) Observable[T]

func (Observable[T]) Fprintln added in v0.2.0

func (observable Observable[T]) Fprintln(out io.Writer) Observable[T]

func (Observable[T]) Go added in v0.2.0

func (observable Observable[T]) Go(schedulers ...Scheduler) Subscription

Go subscribes to the observable and starts execution on a separate goroutine. It ignores all emissions from the observable sequence, making it useful when you only care about side effects and not the actual values. By default, it uses the Goroutine scheduler, but an optional scheduler can be provided. Returns a Subscription that can be used to cancel the subscription when no longer needed.

func (Observable[T]) Last

func (observable Observable[T]) Last(schedulers ...Scheduler) (value T, err error)

func (Observable[T]) Map

func (observable Observable[T]) Map(project func(T) any) Observable[any]

func (Observable[T]) MapE added in v0.2.1

func (observable Observable[T]) MapE(project func(T) (any, error)) Observable[any]

func (Observable[T]) Marshal added in v0.2.0

func (observable Observable[T]) Marshal(marshal func(any) ([]byte, error)) Observable[[]byte]

func (Observable[T]) MergeWith

func (observable Observable[T]) MergeWith(others ...Observable[T]) Observable[T]

func (Observable[T]) OnComplete added in v0.2.2

func (observable Observable[T]) OnComplete(f func()) Observable[T]

func (Observable[T]) OnDone added in v0.2.2

func (observable Observable[T]) OnDone(f func(error)) Observable[T]

func (Observable[T]) OnError added in v0.2.2

func (observable Observable[T]) OnError(f func(error)) Observable[T]

func (Observable[T]) OnNext added in v0.2.2

func (observable Observable[T]) OnNext(f func(T)) Observable[T]

func (Observable[T]) Passthrough added in v0.2.0

func (observable Observable[T]) Passthrough() Observable[T]

func (Observable[T]) Pipe added in v0.2.0

func (observable Observable[T]) Pipe(segments ...Pipe[T]) Observable[T]

func (Observable[T]) Print added in v0.2.0

func (observable Observable[T]) Print() Observable[T]

func (Observable[T]) Printf added in v0.2.0

func (observable Observable[T]) Printf(format string) Observable[T]

func (Observable[T]) Println

func (observable Observable[T]) Println() Observable[T]

func (Observable[T]) Publish

func (observable Observable[T]) Publish() Connectable[T]

func (Observable[T]) RaceWith added in v0.2.0

func (observable Observable[T]) RaceWith(others ...Observable[T]) Observable[T]

func (Observable[T]) Repeat

func (observable Observable[T]) Repeat(count ...int) Observable[T]

Repeat emits the items emitted by the source Observable repeatedly.

Parameters:

  • count: Optional. The number of repetitions:
  • If omitted: The source Observable is repeated indefinitely
  • If 0: Returns an empty Observable
  • If negative: Returns an Observable that emits an error
  • If multiple count values: Returns an Observable that emits an error

The resulting Observable will subscribe to the source Observable repeatedly each time the source completes, up to the specified count.

func (Observable[T]) Retry

func (observable Observable[T]) Retry(limit ...int) Observable[T]

func (Observable[T]) RetryTime added in v0.2.0

func (observable Observable[T]) RetryTime(backoff func(int) time.Duration, limit ...int) Observable[T]

func (Observable[T]) SampleTime

func (observable Observable[T]) SampleTime(window time.Duration) Observable[T]

SampleTime emits the most recent item emitted by an Observable within periodic time intervals.

func (Observable[T]) Send added in v0.2.0

func (observable Observable[T]) Send(ch chan<- T) Observable[T]

func (Observable[T]) Share added in v0.2.0

func (observable Observable[T]) Share() Observable[T]

Share returns a new Observable that multicasts (shares) the original Observable. As long as there is at least one Subscriber this Observable will be subscribed and emitting data. When all subscribers have unsubscribed it will unsubscribe from the source Observable. Because the Observable is multicasting it makes the stream hot.

This method is useful when you have an Observable that is expensive to create or has side-effects, but you want to share the results of that Observable with multiple subscribers. By using `Share`, you can avoid creating multiple instances of the Observable and ensure that all subscribers receive the same data.

func (Observable[T]) Skip

func (observable Observable[T]) Skip(n int) Observable[T]

func (Observable[T]) Slice added in v0.2.0

func (observable Observable[T]) Slice(schedulers ...Scheduler) (slice []T, err error)

func (Observable[T]) StartWith

func (observable Observable[T]) StartWith(values ...T) Observable[T]

func (Observable[T]) Subscribe

func (observable Observable[T]) Subscribe(observe Observer[T], scheduler Scheduler) Subscription

func (Observable[T]) SubscribeOn

func (observable Observable[T]) SubscribeOn(scheduler ConcurrentScheduler) Observable[T]

func (Observable[T]) Take

func (observable Observable[T]) Take(n int) Observable[T]

Take returns an Observable that emits only the first count values emitted by the source Observable. If the source emits fewer than count values then all of its values are emitted. After that, it completes, regardless if the source completes.

func (Observable[T]) TakeWhile

func (observable Observable[T]) TakeWhile(condition func(T) bool) Observable[T]

func (Observable[T]) Tap added in v0.2.0

func (observable Observable[T]) Tap(tap Observer[T]) Observable[T]

func (Observable[T]) Values added in v0.2.0

func (observable Observable[T]) Values(scheduler ...Scheduler) iter.Seq[T]

func (Observable[T]) Wait

func (observable Observable[T]) Wait(schedulers ...Scheduler) error

type Observer

type Observer[T any] func(next T, err error, done bool)

func AsObserver added in v0.2.0

func AsObserver[T any](observe Observer[any]) Observer[T]

AsObserver converts an Observer of any type to an Observer of a specific type T. This allows adapting a generic Observer to a more specific type context.

func Ignore added in v0.2.0

func Ignore[T any]() Observer[T]

Ignore creates an Observer that simply discards any emissions from an Observable. It is useful when you need to create an Observer but don't care about its values.

func (Observer[T]) AsObserver

func (observe Observer[T]) AsObserver() Observer[any]

AsObserver converts a typed Observer[T] to a generic Observer[any]. It handles type conversion from 'any' back to T, and sends a TypecastFailed error when conversion fails.

func (Observer[T]) Done added in v0.2.2

func (observe Observer[T]) Done(err error)

Done signals that the Observable has completed emitting values, optionally with an error. If err is nil, it indicates normal completion. If err is non-nil, it indicates that the Observable terminated with an error.

func (Observer[T]) Next

func (observe Observer[T]) Next(next T)

Next sends a new value to the Observer. It indicates that a new value has been emitted by the Observable.

type Pipe added in v0.2.0

type Pipe[T any] func(Observable[T]) Observable[T]

func Append added in v0.2.1

func Append[T any](slice *[]T) Pipe[T]

func Assign added in v0.2.0

func Assign[T any](value *T) Pipe[T]

func AutoUnsubscribe added in v0.2.2

func AutoUnsubscribe[T any]() Pipe[T]

func Catch added in v0.2.0

func Catch[T any](other Observable[T]) Pipe[T]

func CatchError added in v0.2.0

func CatchError[T any](selector func(err error, caught Observable[T]) Observable[T]) Pipe[T]

func ConcatWith added in v0.2.1

func ConcatWith[T any](others ...Observable[T]) Pipe[T]

func Delay added in v0.2.1

func Delay[T any](duration time.Duration) Pipe[T]

func DistinctUntilChanged added in v0.2.0

func DistinctUntilChanged[T any](equal func(T, T) bool) Pipe[T]

func Do added in v0.2.0

func Do[T any](do func(T)) Pipe[T]

func ElementAt added in v0.2.1

func ElementAt[T any](n int) Pipe[T]

func EndWith added in v0.2.2

func EndWith[T any](values ...T) Pipe[T]

func Filter added in v0.2.0

func Filter[T any](predicate func(T) bool) Pipe[T]

func Fprint added in v0.2.0

func Fprint[T any](out io.Writer) Pipe[T]

func Fprintf added in v0.2.0

func Fprintf[T any](out io.Writer, format string) Pipe[T]

func Fprintln added in v0.2.0

func Fprintln[T any](out io.Writer) Pipe[T]

func MergeWith added in v0.2.1

func MergeWith[T any](others ...Observable[T]) Pipe[T]

func OnComplete added in v0.2.1

func OnComplete[T any](onComplete func()) Pipe[T]

func OnDone added in v0.2.1

func OnDone[T any](onDone func(error)) Pipe[T]

func OnError added in v0.2.1

func OnError[T any](onError func(error)) Pipe[T]

func OnNext added in v0.2.1

func OnNext[T any](onNext func(T)) Pipe[T]

func Passthrough added in v0.2.0

func Passthrough[T any]() Pipe[T]

func Print added in v0.2.0

func Print[T any]() Pipe[T]

func Printf added in v0.2.0

func Printf[T any](format string) Pipe[T]

func Println

func Println[T any]() Pipe[T]

func RaceWith added in v0.2.1

func RaceWith[T any](others ...Observable[T]) Pipe[T]

func Repeat added in v0.2.2

func Repeat[T any](count ...int) Pipe[T]

Repeat creates an Observable that emits the entire source sequence multiple times.

Parameters:

  • count: Optional. The number of repetitions:
  • If omitted: The source Observable is repeated indefinitely
  • If 0: Returns an empty Observable
  • If negative: Returns an Observable that emits an error
  • If multiple count values: Returns an Observable that emits an error

The resulting Observable will subscribe to the source Observable repeatedly each time the source completes, up to the specified count.

func Retry added in v0.2.2

func Retry[T any](limit ...int) Pipe[T]

func Send added in v0.2.0

func Send[T any](ch chan<- T) Pipe[T]

func Skip added in v0.2.0

func Skip[T any](n int) Pipe[T]

func StartWith added in v0.2.2

func StartWith[T any](values ...T) Pipe[T]

func Take added in v0.2.0

func Take[T any](n int) Pipe[T]

Take returns an Observable that emits only the first count values emitted by the source Observable. If the source emits fewer than count values then all of its values are emitted. After that, it completes, regardless if the source completes.

func TakeWhile added in v0.2.0

func TakeWhile[T any](condition func(T) bool) Pipe[T]

func Tap added in v0.2.0

func Tap[T any](tap Observer[T]) Pipe[T]

type Scheduler

type Scheduler = scheduler.Scheduler

type Signed added in v0.2.2

type Signed interface {
	~int | ~int8 | ~int16 | ~int32 | ~int64
}

Signed is a constraint that permits any signed integer type. If future releases of Go add new predeclared signed integer types, this constraint will be modified to include them.

type Subscriber

type Subscriber interface {
	// Subscribed returns true if the subscriber is in a subscribed state.
	// Returns false once Unsubscribe has been called.
	Subscribed() bool

	// Unsubscribe changes the state to unsubscribed and executes all registered
	// callback functions. Does nothing if already unsubscribed.
	Unsubscribe()

	// Add creates and returns a new child Subscriber.
	// If the parent is already unsubscribed, the child will be created in an
	// unsubscribed state. Otherwise, the child will be unsubscribed when the parent
	// is unsubscribed.
	Add() Subscriber

	// OnUnsubscribe registers a callback function to be executed when Unsubscribe is called.
	// If the subscriber is already unsubscribed, the callback is executed immediately.
	// If callback is nil, this method does nothing.
	OnUnsubscribe(callback func())
}

Subscriber is a subscribable entity that allows construction of a Subscriber tree.

type Subscription

type Subscription interface {
	// Subscribed returns true until Unsubscribe is called.
	Subscribed() bool

	// Unsubscribe will change the state to unsubscribed.
	Unsubscribe()

	// Done returns a channel that is closed when the subscription state changes to unsubscribed.
	// This channel can be used with select statements to react to subscription termination events.
	// If the scheduler is not concurrent, it will spawn a goroutine to wait for the scheduler.
	Done() <-chan struct{}

	// Err returns the subscription's terminal state:
	// - nil if the observable completed successfully
	// - the observable's error if it terminated with an error
	// - SubscriptionCanceled if the subscription was manually unsubscribed
	// - SubscriptionActive if the subscription is still active
	Err() error

	// Wait blocks until the subscription state becomes unsubscribed.
	// If the subscription is already unsubscribed, it returns immediately.
	// If the scheduler is not concurrent, it will wait for the scheduler to complete.
	// Returns:
	// - nil if the observable completed successfully
	// - the observable's error if it terminated with an error
	// - SubscriptionCanceled if the subscription was manually unsubscribed
	Wait() error
}

Subscription is an interface that allows monitoring and controlling a subscription. It provides methods for tracking the subscription's lifecycle.

type Tuple2 added in v0.2.1

type Tuple2[T, U any] struct {
	First  T
	Second U
}

type Tuple3 added in v0.2.1

type Tuple3[T, U, V any] struct {
	First  T
	Second U
	Third  V
}

type Tuple4 added in v0.2.1

type Tuple4[T, U, V, W any] struct {
	First  T
	Second U
	Third  V
	Fourth W
}

type Tuple5 added in v0.2.1

type Tuple5[T, U, V, W, X any] struct {
	First  T
	Second U
	Third  V
	Fourth W
	Fifth  X
}

type Unsigned added in v0.2.2

type Unsigned interface {
	~uint | ~uint8 | ~uint16 | ~uint32 | ~uint64 | ~uintptr
}

Unsigned is a constraint that permits any unsigned integer type. If future releases of Go add new predeclared unsigned integer types, this constraint will be modified to include them.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL