mono

package
v0.7.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 17, 2020 License: Apache-2.0 Imports: 11 Imported by: 17

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CreateProcessorOneshot added in v0.7.0

func CreateProcessorOneshot() (Mono, Sink)

func IsSubscribeAsync added in v0.7.0

func IsSubscribeAsync(m Mono) bool

IsSubscribeAsync returns true if target Mono will be subscribed async.

Types

type DelayBuilder added in v0.7.1

type DelayBuilder time.Duration

func Delay added in v0.7.1

func Delay(delay time.Duration) DelayBuilder

func (DelayBuilder) ToMono added in v0.7.1

func (d DelayBuilder) ToMono(transform func() (payload.Payload, error)) Mono

type Mono

type Mono interface {
	rx.Publisher
	// Filter evaluate each source value against the given Predicate.
	// If the predicate test succeeds, the value is emitted.
	Filter(rx.FnPredicate) Mono
	// Map transform the item emitted by this Mono by applying a synchronous function to another.
	Map(rx.FnTransform) Mono
	// FlatMap Transform the item emitted by this Mono asynchronously, returning the value emitted by another Mono.
	FlatMap(func(payload.Payload) Mono) Mono
	// DoFinally adds behavior (side-effect) triggered after the Mono terminates for any reason, including cancellation.
	DoFinally(rx.FnFinally) Mono
	// DoOnError adds behavior (side-effect) triggered when the Mono completes with an error.
	DoOnError(rx.FnOnError) Mono
	// DoOnSuccess adds behavior (side-effect) triggered when the Mono completes with an success.
	DoOnSuccess(rx.FnOnNext) Mono
	// DoOnCancel add behavior (side-effect) triggered when the Mono is cancelled.
	DoOnCancel(rx.FnOnCancel) Mono
	// DoOnSubscribe add behavior (side-effect) triggered when the Mono is done being subscribed.
	DoOnSubscribe(rx.FnOnSubscribe) Mono
	// SubscribeOn customize a Scheduler running Subscribe, OnSubscribe and Request.
	SubscribeOn(scheduler.Scheduler) Mono
	// SubscribeWithChan subscribe to this Mono and puts item/error into channels.
	SubscribeWithChan(ctx context.Context, valueChan chan<- payload.Payload, errChan chan<- error)
	// BlockUnsafe blocks Mono and returns data and error.
	// Payload could be pooled sometimes, so make sure calling ReleaseFunc when you no longer need Payload, or it will cause leak problem.
	BlockUnsafe(context.Context) (payload.Payload, ReleaseFunc, error)
	// Block blocks Mono and returns a cloned payload.
	// It's different from BlockUnsafe, you don't need release it manually.
	Block(context.Context) (payload.Payload, error)
	// SwitchIfEmpty switch to an alternative Publisher if this Mono is completed without any data.
	SwitchIfEmpty(alternative Mono) Mono
	// SwitchIfError switch to an alternative Publisher if this Mono is end with an error.
	SwitchIfError(alternative func(error) Mono) Mono
	// SwitchValueIfError switch to an alternative Payload if this Mono is end with an error.
	SwitchValueIfError(alternative payload.Payload) Mono
	// Raw returns low-level reactor.Mono which defined in reactor-go library.
	Raw() mono.Mono
	// ToChan subscribe Mono and puts items into a chan.
	// It also puts errors into another chan.
	ToChan(ctx context.Context) (c <-chan payload.Payload, e <-chan error)
	// Timeout sets the timeout value.
	Timeout(timeout time.Duration) Mono
}

Mono is a Reactive Streams Publisher with basic rx operators that completes successfully by emitting an element, or with an error.

func Create

func Create(gen func(context.Context, Sink)) Mono

Create wraps a generator function to a Mono.

func CreateFromChannel added in v0.3.1

func CreateFromChannel(payloads <-chan payload.Payload, err <-chan error) Mono

CreateFromChannel creates a Mono from channels.

func CreateOneshot added in v0.7.0

func CreateOneshot(gen func(context.Context, Sink)) Mono

CreateOneshot wraps a generator function to an oneshot Mono.

func Empty

func Empty() Mono

Empty returns an empty Mono.

func Error added in v0.3.1

func Error(err error) Mono

Error wraps an error to a Mono.

func ErrorOneshot added in v0.7.0

func ErrorOneshot(err error) Mono

ErrorOneshot wraps an error to an oneshot Mono.

func FromFunc added in v0.7.1

func FromFunc(gen func(context.Context) (payload.Payload, error)) Mono

func Just

func Just(input payload.Payload) Mono

Just wrap an exist Payload to a Mono.

func JustOneshot added in v0.7.0

func JustOneshot(input payload.Payload) Mono

JustOneshot wraps an existing Payload to an oneshot Mono.

func JustOrEmpty

func JustOrEmpty(input payload.Payload) Mono

JustOrEmpty wraps an existing Payload to a Mono. Payload could be nil here.

func Raw

func Raw(input mono.Mono) Mono

Raw wrap a low-level Mono.

type Processor

type Processor interface {
	Sink
	Mono
}

Processor combine Sink and Mono.

func CreateProcessor

func CreateProcessor() Processor

CreateProcessor creates a Processor.

type ReleaseFunc added in v0.7.0

type ReleaseFunc func()

ReleaseFunc can be used to release resources.

type Sink

type Sink interface {
	// Success emits a single value then complete current Sink.
	Success(payload.Payload)
	// Error emits an error then complete current Sink.
	Error(error)
}

Sink is a wrapper API around an actual downstream Subscriber for emitting nothing, a single value or an error (mutually exclusive).

type ZipBuilder added in v0.7.1

type ZipBuilder []mono.Mono

func Zip added in v0.7.1

func Zip(first Mono, second Mono, others ...Mono) ZipBuilder

func ZipAll added in v0.7.1

func ZipAll(sources ...Mono) ZipBuilder

func (ZipBuilder) ToMono added in v0.7.1

func (z ZipBuilder) ToMono(transform func(rx.Tuple) (payload.Payload, error)) Mono

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL