flux

package
v0.5.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 19, 2022 License: MIT Imports: 15 Imported by: 3

Documentation

Overview

Example
gen := func(ctx context.Context, sink flux.Sink) {
	for i := 0; i < 10; i++ {
		v := i
		sink.Next(v)
	}
	sink.Complete()
}
done := make(chan struct{})

var su reactor.Subscription
flux.Create(gen).
	Filter(func(i Any) bool {
		return i.(int)%2 == 0
	}).
	Map(func(i interface{}) (Any, error) {
		return fmt.Sprintf("#HELLO_%04d", i.(int)), nil
	}).
	SubscribeOn(scheduler.Elastic()).
	Subscribe(context.Background(),
		reactor.OnSubscribe(func(ctx context.Context, s reactor.Subscription) {
			su = s
			s.Request(1)
		}),
		reactor.OnNext(func(v Any) error {
			fmt.Println("next:", v)
			su.Request(1)
			return nil
		}),
		reactor.OnComplete(func() {
			close(done)
		}),
	)
<-done
Output:

Index

Examples

Constants

View Source
const (
	BuffSizeXS = 32
	BuffSizeSM = 256
)

Variables

This section is empty.

Functions

func InitBuffSize added in v0.2.0

func InitBuffSize(size int)

InitBuffSize initialize the size of buff. (default=256)

Types

type Any added in v0.2.0

type Any = reactor.Any

type CreateOption added in v0.0.11

type CreateOption func(*fluxCreate)

func WithOverflowStrategy added in v0.0.11

func WithOverflowStrategy(o OverflowStrategy) CreateOption

type Flux

type Flux interface {
	reactor.Publisher
	Filter(reactor.Predicate) Flux
	Map(reactor.Transformer) Flux
	Take(n int) Flux
	DoOnDiscard(reactor.FnOnDiscard) Flux
	DoOnNext(reactor.FnOnNext) Flux
	DoOnComplete(reactor.FnOnComplete) Flux
	DoOnError(reactor.FnOnError) Flux
	DoOnCancel(reactor.FnOnCancel) Flux
	DoOnRequest(reactor.FnOnRequest) Flux
	DoOnSubscribe(reactor.FnOnSubscribe) Flux
	DoFinally(reactor.FnOnFinally) Flux
	SwitchOnFirst(FnSwitchOnFirst) Flux
	DelayElement(delay time.Duration) Flux
	SubscribeOn(scheduler.Scheduler) Flux
	SubscribeWithChan(ctx context.Context, valueChan interface{}, errChan chan<- error)
	BlockFirst(context.Context) (Any, error)
	BlockLast(context.Context) (Any, error)
	BlockToSlice(ctx context.Context, slicePtr interface{}) error
}

func Create

func Create(c func(ctx context.Context, sink Sink), options ...CreateOption) Flux

func Empty added in v0.0.2

func Empty() Flux

func Error added in v0.0.5

func Error(e error) Flux

func Interval added in v0.0.2

func Interval(period time.Duration) Flux

func Just

func Just(values ...Any) Flux

func Range added in v0.0.5

func Range(start, count int) Flux

type FnSwitchOnFirst added in v0.0.5

type FnSwitchOnFirst func(s Signal, f Flux) Flux

type OverflowStrategy

type OverflowStrategy int8
const (
	OverflowBuffer OverflowStrategy = iota
	OverflowIgnore
	OverflowError
	OverflowDrop
	OverflowLatest
)

type Processor added in v0.0.5

type Processor interface {
	Flux
	Sink
}

func NewUnicastProcessor added in v0.0.5

func NewUnicastProcessor() Processor

type Signal added in v0.0.5

type Signal interface {
	Value() (Any, bool)
	Type() reactor.SignalType
}

type Sink

type Sink interface {
	Complete()
	Error(error)
	Next(Any)
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL