Documentation
¶
Overview ¶
RefCount makes a Connectable behave like an ordinary Observable.
On first Subscribe it will call Connect on its Connectable and when its last subscriber is Unsubscribed it will cancel the connection by calling Unsubscribe on the subscription returned by the call to Connect.
As long as the source Observable of a Connectable has not completed, the calls to Connect and subsequent Unsubscribe will create and cancel the subscription from the Connectable to its source. If the source is a cold observable, this means that the same data will be generated by the cold observable repeatedly. But because this happens on the first subscribe, subsequent subscriptions may not see the data depending on the type of Connectable RefCount is called on.
RefCount http://reactivex.io/documentation/operators/refcount.html
Example (IntroToRx) ¶
This example is a variant of the example in the book "Introduction to Rx" about using RefCount.
const ms = time.Millisecond
concurrent := GoroutineScheduler()
observable := IntervalInt(50 * ms)
// Print when a value is published.
observable = observable.Do(func(next int) { fmt.Printf("published: %d\n", next) })
observable = observable.Publish().RefCount()
// Make all subscriptions to observable concurrent
observable = observable.SubscribeOn(concurrent)
fmt.Println(">> Subscribing")
subscription := observable.Subscribe(func(next int, err error, done bool) {
if !done {
fmt.Printf("subscription : %d\n", next)
}
})
// The observable is hot for the next 100 milliseconds. It then will go
// cold, unless another observer subscribes in that period.
time.Sleep(175 * ms)
fmt.Println(">> Unsubscribing")
subscription.Unsubscribe()
fmt.Println(">> Finished")
// Wait for all scheduled tasks to terminate.
concurrent.Wait()
Output: >> Subscribing published: 0 subscription : 0 published: 1 subscription : 1 published: 2 subscription : 2 >> Unsubscribing >> Finished
Example (RefCountMultipleSubscriptions) ¶
An example showing multiple subscriptions on a multicasting Publish Connectable who's Connect is controlled by a RefCount operator.
// Subscribe on concurrent scheduler
concurrent := GoroutineScheduler()
var wg sync.WaitGroup
channel := make(chan int, 30)
source := FromChanInt(channel).Publish().RefCount().SubscribeOn(concurrent)
sub1 := source.Subscribe(func(n int, err error, done bool) {
if !done {
fmt.Println(n)
os.Stdout.Sync()
wg.Done()
}
})
sub2 := source.Subscribe(func(n int, err error, done bool) {
if !done {
fmt.Println(n)
os.Stdout.Sync()
wg.Done()
}
})
// 3 goroutines are now starting, 1 for publishing and 2 for subscribing.
// in the mean time we start feeding the channel.
wg.Add(6)
channel <- 1
channel <- 2
channel <- 3
// wait for the channel data to propagate
wg.Wait()
// cancel the first subscription.
sub1.Unsubscribe()
// more data for the second subscription, then close the
// channel to complete the observable and make Wait return.
wg.Add(1)
channel <- 4
close(channel)
sub2.Wait()
wg.Wait()
Output: 1 1 2 2 3 3 4
Index ¶
- Constants
- func MakeObserverObservable(age time.Duration, length int, capacity ...int) (Observer, Observable)
- type Connectable
- type IntMulticaster
- type IntObserver
- type Observable
- type ObservableInt
- func (o ObservableInt) Do(f func(next int)) ObservableInt
- func (o ObservableInt) Multicast(factory func() SubjectInt) IntMulticaster
- func (o ObservableInt) Publish() IntMulticaster
- func (o ObservableInt) Subscribe(observe IntObserver, schedulers ...Scheduler) Subscription
- func (o ObservableInt) SubscribeOn(scheduler Scheduler) ObservableInt
- type Observer
- type RxError
- type Scheduler
- type SubjectInt
- type Subscriber
- type Subscription
Examples ¶
Constants ¶
const ErrUnsubscribed = RxError("subscriber unsubscribed")
Unsubscribed is the error returned by wait when the Unsubscribe method is called on the subscription.
const OutOfSubscriptions = RxError("out of subscriptions")
const TypecastFailed = RxError("typecast failed")
ErrTypecast is delivered to an observer if the generic value cannot be typecast to a specific type.
Variables ¶
This section is empty.
Functions ¶
func MakeObserverObservable ¶
MakeObserverObservable turns an observer into a multicasting and buffering observable. Both the observer and the obeservable are returned. These are then used as the core of any Subject implementation. The Observer side is used to pass items into the buffering multicaster. This then multicasts the items to every Observer that subscribes to the returned Observable.
age age below which items are kept to replay to a new subscriber. length length of the item buffer, number of items kept to replay to a new subscriber. [cap] Capacity of the item buffer, number of items that can be observed before blocking. [scap] Capacity of the subscription list, max number of simultaneous subscribers.
Types ¶
type Connectable ¶
type Connectable func(Scheduler, Subscriber)
Connectable provides the Connect method for a Multicaster.
func (Connectable) Connect ¶
func (c Connectable) Connect(schedulers ...Scheduler) Subscription
Connect instructs a multicaster to subscribe to its source and begin multicasting items to its subscribers. Connect accepts an optional scheduler argument.
type IntMulticaster ¶
type IntMulticaster struct {
ObservableInt
Connectable
}
IntMulticaster is a multicasting connectable observable. One or more IntObservers can subscribe to it simultaneously. It will subscribe to the source ObservableInt when Connect is called. After that, every emission from the source is multcast to all subscribed IntObservers.
func (IntMulticaster) RefCount ¶
func (o IntMulticaster) RefCount() ObservableInt
RefCount makes a IntMulticaster behave like an ordinary ObservableInt. On first Subscribe it will call Connect on its IntMulticaster and when its last subscriber is Unsubscribed it will cancel the source connection by calling Unsubscribe on the subscription returned by the call to Connect.
type IntObserver ¶
IntObserver is a function that gets called whenever the Observable has something to report. The next argument is the item value that is only valid when the done argument is false. When done is true and the err argument is not nil, then the Observable has terminated with an error. When done is true and the err argument is nil, then the Observable has completed normally.
func (IntObserver) Complete ¶
func (o IntObserver) Complete()
Complete is called by an ObservableInt to signal that no more data is forthcoming to the Observer.
func (IntObserver) Error ¶
func (o IntObserver) Error(err error)
Error is called by an ObservableInt to report an error to the Observer.
func (IntObserver) Next ¶
func (o IntObserver) Next(next int)
Next is called by an ObservableInt to emit the next int value to the Observer.
type Observable ¶
type Observable func(Observer, Scheduler, Subscriber)
Observable is a function taking an Observer, Scheduler and Subscriber. Calling it will subscribe the Observer to events from the Observable.
func (Observable) AsObservableInt ¶
func (o Observable) AsObservableInt() ObservableInt
AsObservableInt turns an Observable of interface{} into an ObservableInt. If during observing a typecast fails, the error ErrTypecastToInt will be emitted.
type ObservableInt ¶
type ObservableInt func(IntObserver, Scheduler, Subscriber)
ObservableInt is a function taking an Observer, Scheduler and Subscriber. Calling it will subscribe the Observer to events from the Observable.
func FromChanInt ¶
func FromChanInt(ch <-chan int) ObservableInt
FromChanInt creates an ObservableInt from a Go channel of int values. It's not possible for the code feeding into the channel to send an error. The feeding code can send nil or more int items and then closing the channel will be seen as completion.
func IntervalInt ¶
func IntervalInt(interval time.Duration) ObservableInt
IntervalInt creates an ObservableInt that emits a sequence of integers spaced by a particular time interval. First integer is not emitted immediately, but only after the first time interval has passed. The generated code will do a type conversion from int to int.
func (ObservableInt) Do ¶
func (o ObservableInt) Do(f func(next int)) ObservableInt
Do calls a function for each next value passing through the observable.
func (ObservableInt) Multicast ¶
func (o ObservableInt) Multicast(factory func() SubjectInt) IntMulticaster
Multicast converts an ordinary observable into a multicasting connectable observable or multicaster for short. A multicaster will only start emitting values after its Connect method has been called. The factory method passed in should return a new SubjectInt that implements the actual multicasting behavior.
func (ObservableInt) Publish ¶
func (o ObservableInt) Publish() IntMulticaster
Publish returns a IntMulticaster for a Subject to an underlying IntObservable and turns the subject into a connnectable observable. A Subject emits to an observer only those items that are emitted by the underlying IntObservable subsequent to the time of the observer subscribes. When the underlying IntObervable terminates with an error, then subscribed observers will receive that error. After all observers have unsubscribed due to an error, the IntMulticaster does an internal reset just before the next observer subscribes. So this Publish operator is re-connectable, unlike the RxJS 5 behavior that isn't. To simulate the RxJS 5 behavior use Publish().AutoConnect(1) this will connect on the first subscription but will never re-connect.
func (ObservableInt) Subscribe ¶
func (o ObservableInt) Subscribe(observe IntObserver, schedulers ...Scheduler) Subscription
Subscribe operates upon the emissions and notifications from an Observable. This method returns a Subscription. Subscribe uses a serial scheduler created with NewScheduler().
func (ObservableInt) SubscribeOn ¶
func (o ObservableInt) SubscribeOn(scheduler Scheduler) ObservableInt
SubscribeOn specifies the scheduler an ObservableInt should use when it is subscribed to.
type Observer ¶
Observer is a function that gets called whenever the Observable has something to report. The next argument is the item value that is only valid when the done argument is false. When done is true and the err argument is not nil, then the Observable has terminated with an error. When done is true and the err argument is nil, then the Observable has completed normally.
func (Observer) AsIntObserver ¶
func (o Observer) AsIntObserver() IntObserver
AsIntObserver converts an observer of interface{} items to an observer of int items.
type Scheduler ¶
Scheduler is used to schedule tasks to support subscribing and observing.
func GoroutineScheduler ¶
func GoroutineScheduler() Scheduler
func NewScheduler ¶ added in v0.0.1
func NewScheduler() Scheduler
type SubjectInt ¶
type SubjectInt struct {
IntObserver
ObservableInt
}
SubjectInt is a combination of an IntObserver and ObservableInt. Subjects are special because they are the only reactive constructs that support multicasting. The items sent to it through its observer side are multicasted to multiple clients subscribed to its observable side.
The SubjectInt exposes all methods from the embedded IntObserver and ObservableInt. Use the IntObserver Next, Error and Complete methods to feed data to it. Use the ObservableInt methods to subscribe to it.
After a subject has been terminated by calling either Error or Complete, it goes into terminated state. All subsequent calls to its observer side will be silently ignored. All subsequent subscriptions to the observable side will be handled according to the specific behavior of the subject. There are different types of subjects, see the different NewXxxSubjectInt functions for more info.
func NewSubjectInt ¶
func NewSubjectInt() SubjectInt
NewSubjectInt creates a new Subject. After the subject is terminated, all subsequent subscriptions to the observable side will be terminated immediately with either an Error or Complete notification send to the subscribing client
Note that this implementation is blocking. When there are subscribers, the observable goroutine is blocked until all subscribers have processed the next, error or complete notification.
type Subscriber ¶
type Subscriber interface {
// A Subscriber is also a Subscription.
Subscription
// Add will create and return a new child Subscriber setup in such a way that
// calling Unsubscribe on the parent will also call Unsubscribe on the child.
// Calling the Unsubscribe method on the child will NOT propagate to the
// parent!
Add() Subscriber
// OnUnsubscribe will add the given callback function to the subscriber.
// The callback will be called when either the Unsubscribe of the parent
// or of the subscriber itself is called. If the subscription was already
// canceled, then the callback function will just be called immediately.
OnUnsubscribe(callback func())
// OnWait will register a callback to call when subscription Wait is called.
OnWait(callback func())
// Done will set the error internally and then cancel the subscription by
// calling the Unsubscribe method. A nil value for error indicates success.
Done(err error)
// Error returns the error set by calling the Done(err) method. As long as
// the subscriber is still subscribed Error will return nil.
Error() error
}
Subscriber is a Subscription with management functionality.
func NewSubscriber ¶ added in v0.0.1
func NewSubscriber() Subscriber
New will create and return a new Subscriber.
type Subscription ¶
type Subscription interface {
// Subscribed returns true when the subscription is currently active.
Subscribed() bool
// Unsubscribe will do nothing if the subscription is not active. If the
// state is still active however, it will be changed to canceled.
// Subsequently, it will call Unsubscribe on all child subscriptions added
// through Add, along with all methods added through OnUnsubscribe. When the
// subscription is canceled by calling Unsubscribe a call to the Wait method
// will return the error ErrUnsubscribed.
Unsubscribe()
// Canceled returns true when the subscription state is canceled.
Canceled() bool
// Wait will by default block the calling goroutine and wait for the
// Unsubscribe method to be called on this subscription.
// However, when OnWait was called with a callback wait function it will
// call that instead. Calling Wait on a subscription that has already been
// canceled will return immediately. If the subscriber was canceled by
// calling Unsubscribe, then the error returned is ErrUnsubscribed.
// If the subscriber was terminated by calling Done, then the error
// returned here is the one passed to Done.
Wait() error
}
Subscription is an interface that allows code to monitor and control a subscription it received.