rx

package
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 9, 2019 License: Apache-2.0 Imports: 10 Imported by: 15

Documentation

Index

Constants

View Source
const (
	RequestInfinite = math.MaxInt32
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Disposable

type Disposable interface {
	// Dispose dispose current resource.
	Dispose()
	// IsDisposed returns true if it has been disposed.
	IsDisposed() bool
}

Disposable is a disposable resource.

type Do

type Do = func(ctx context.Context)

Do is alias of the function which will be executed in scheduler.

type Flux

type Flux interface {
	Publisher
	// LimitRate limits the number of elements in batches.
	LimitRate(n int) Flux
	// DoOnRequest register handler when subsccriber request more elements.
	DoOnRequest(fn FnOnRequest) Flux
	// DoOnSubscribe register handler when subscribe begin.
	DoOnSubscribe(fn FnOnSubscribe) Flux
	// DoOnNext register handler when emitting next element.
	DoOnNext(fn FnOnNext) Flux
	// DoOnNext register handler after emitting next element.
	DoAfterNext(fn FnConsumer) Flux
	// DoOnComplete register handler when Flux was completed.
	DoOnComplete(fn FnOnComplete) Flux
	// DoOnError register handler when an exception occurs.
	DoOnError(fn FnOnError) Flux
	// DoOnCancel register handler when Mono was canceled.
	DoOnCancel(fn FnOnCancel) Flux
	// DoFinally register handler when Mono was terminated.
	// DoFinally will definitely be executed.
	DoFinally(fn FnOnFinally) Flux
	// SubscribeOn specify scheduler for subscriber.
	SubscribeOn(s Scheduler) Flux
	// PublishOn specify scheduler for publisher.
	PublishOn(s Scheduler) Flux
}

Flux emits 0 to N elements, and then completes (successfully or with an error).

func NewFlux

func NewFlux(fn func(ctx context.Context, producer Producer)) Flux

NewFlux returns a new Flux.

func NewFluxFromArray

func NewFluxFromArray(first payload.Payload, others ...payload.Payload) Flux

NewFluxFromArray returns a new Flux with payloads.

func ToFlux

func ToFlux(publisher Publisher) Flux

ToFlux converts Publisher to Flux.

type FnConsumer

type FnConsumer = func(ctx context.Context, elem payload.Payload)

FnConsumer is alias of consumer function.

type FnOnCancel

type FnOnCancel = func(ctx context.Context)

FnOnCancel is alias of `OnCancel` handler.

type FnOnComplete

type FnOnComplete = func(ctx context.Context)

FnOnComplete is alias of `OnComplete` handler.

type FnOnError

type FnOnError = func(ctx context.Context, err error)

FnOnError is alias of `OnError` handler.

type FnOnFinally

type FnOnFinally = func(ctx context.Context, st SignalType)

FnOnFinally is alias of `OnFinally` handler.

type FnOnNext

type FnOnNext = func(ctx context.Context, s Subscription, elem payload.Payload)

FnOnNext is alias of `OnNext` handler.

type FnOnRequest

type FnOnRequest = func(ctx context.Context, n int)

FnOnRequest is alias of `OnRequest` handler.

type FnOnSubscribe

type FnOnSubscribe = func(ctx context.Context, s Subscription)

FnOnSubscribe is alias of `OnSubscribe` handler.

type IntRange

type IntRange struct {
	// contains filtered or unexported fields
}

IntRange is utilities for range operations.

func Range

func Range(from, to int) *IntRange

Range returns a IntRange.

func (*IntRange) Map

func (p *IntRange) Map(fn func(n int) payload.Payload) Flux

Map converts int to Payload.

type Mono

type Mono interface {
	Publisher
	// DoAfterSuccess register handler after emitting element successfully.
	DoAfterSuccess(fn FnConsumer) Mono
	// DoOnSubscribe register handler on subscribe begin.
	DoOnSubscribe(fn FnOnSubscribe) Mono
	// DoOnSuccess register handler when emitting element successfully.
	DoOnSuccess(fn FnOnNext) Mono
	// DoOnError register handler when an exception occurs.
	DoOnError(fn FnOnError) Mono
	// DoOnCancel register handler when Mono was canceled.
	DoOnCancel(fn FnOnCancel) Mono
	// DoFinally register handler when Mono was terminated.
	// DoFinally will definitely be executed.
	DoFinally(fn FnOnFinally) Mono
	// SubscribeOn specify scheduler for subscriber.
	SubscribeOn(s Scheduler) Mono
	// PublishOn specify scheduler for publisher.
	PublishOn(s Scheduler) Mono
}

Mono completes successfully by emitting an element, or with an error.

func JustMono

func JustMono(element payload.Payload) Mono

JustMono returns a new Mono with single element.

func NewMono

func NewMono(fn func(ctx context.Context, sink MonoProducer)) Mono

NewMono returns a new Mono.

func ToMono

func ToMono(publisher Publisher) Mono

ToMono converts Publisher to Mono.

type MonoProducer

type MonoProducer interface {
	// Success append payload.
	Success(elem payload.Payload) error
	// Error means some bad things happened.
	Error(err error)
}

MonoProducer likes Producer, but it produce single element.

type OptSubscribe

type OptSubscribe func(*hooks)

OptSubscribe is option of subscribe.

func OnComplete

func OnComplete(fn FnOnComplete) OptSubscribe

OnComplete sets handler for OnComplete.

func OnError

func OnError(fn FnOnError) OptSubscribe

OnError sets handler for OnError. Also you can use DoOnError in Mono or Flux.

func OnNext

func OnNext(fn FnOnNext) OptSubscribe

OnNext sets handler for OnNext.

func OnSubscribe

func OnSubscribe(fn FnOnSubscribe) OptSubscribe

OnSubscribe sets handler for OnSubscribe. Also you can use DoOnSubscribe in Mono or Flux.

type Processor

type Processor interface {
	Publisher
	Subscriber
}

Processor process publisher and subscriber.

type Producer

type Producer interface {
	// Next append next element.
	Next(elem payload.Payload) error
	// Error means some bad things happened.
	Error(err error)
	// Complete means production completed.
	Complete()
}

Producer produce elements as you wish.

type Publisher

type Publisher interface {
	// Subscribe subscribe elements from a publisher, returns a Disposable.
	// You can add some custome options.
	// Using `OnSubscribe`, `OnNext`, `OnComplete` and `OnError` as handler wrapper.
	Subscribe(ctx context.Context, ops ...OptSubscribe) Disposable
}

Publisher is a provider of a potentially unbounded number of sequenced elements, \ publishing them according to the demand received from its Subscriber(s).

type Scheduler

type Scheduler interface {
	io.Closer
	// Do register function to do.
	Do(ctx context.Context, fn Do)
}

Scheduler is a work pool for do soming async.

func ElasticScheduler

func ElasticScheduler() Scheduler

ElasticScheduler returns a dynamic scheduler.

func ImmediateScheduler

func ImmediateScheduler() Scheduler

ImmediateScheduler returns a scheduler which will be executed immediate.

func NewElasticScheduler

func NewElasticScheduler(size int) Scheduler

NewElasticScheduler returns a new ElasticScheduler.

type SignalType

type SignalType int8

SignalType is the signal of reactive events like `OnNext`, `OnComplete`, `OnCancel` and `OnError`.

const (

	// SignalComplete indicated that subscriber was completed.
	SignalComplete SignalType
	// SignalCancel indicates that subscriber was cancelled.
	SignalCancel
	// SignalError indicates that subscriber has some faults.
	SignalError
)

type Subscriber

type Subscriber interface {
	// OnSubscribe handle event when subscribe begin.
	OnSubscribe(ctx context.Context, s Subscription)
	// OnNext handle event when a new element produced.
	OnNext(ctx context.Context, s Subscription, elem payload.Payload)
	// OnComplete handle event when subscribe finish.
	OnComplete(ctx context.Context)
	// OnError handle event when an error occurred。
	OnError(ctx context.Context, err error)
}

Subscriber consume elements from a Publisher and handle events.

type Subscription

type Subscription interface {
	// Request pull next n elements. (It was used for FlowControl)
	// When you call it, subscriber will emit `OnRequest` event and you can use `DoOnRequest` catch it.
	Request(n int)
	// Cancel cancel the current subscriber.
	// Subscribers will emit `OnCancel` event and you can use `DoOnCancel` catch it.
	Cancel()
	// N returns current N in queue.
	N() int
}

Subscription means a Subscrber's subscription.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL