streams

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 28, 2024 License: MIT Imports: 8 Imported by: 2

Documentation

Overview

Package streams provides a set of Source and Sink semantics to plug together object streams. Streams are closer to NodeJS object stream semantics as opposed to bufio byte level semantics. Although nothing stops one from putting together a byte stream.

Index

Constants

This section is empty.

Variables

View Source
var Done = errors.New("stream done")

Done represents a writable stream which has been closed. A Sink may return this in the time between being instructed to close and still draining all elements from their buffer

View Source
var End = errors.New("stream end")

End indicates a source has read all elements available on the stream.

View Source
var Full = errors.New("stream full")

Full represents a writable stream whose internal buffers which have been filled. A Full error provides a clear signal for backpressure on the emitting source.

View Source
var Overflow = errors.New("stream overflow")

Overflow indicates the stream would overflow with the given item.

View Source
var UnderRun = errors.New("stream under run")

Functions

This section is empty.

Types

type Buffer

type Buffer[T any] struct {
	Output []T
	// contains filtered or unexported fields
}

Buffer will hold up to a limit of elements before placing back pressure on a writer. Usable as a source also.

func NewBuffer

func NewBuffer[T any](maxCount int, opts ...BufferOpt[T]) *Buffer[T]

NewBuffer creates a new Buffer with the specified maxCount as the limit

func (*Buffer[T]) Finish

func (s *Buffer[T]) Finish(ctx context.Context) error

Finish will transition the buffer into FinalFlush mode until all elements are read, at which point the buffer will the transition into Finished.

func (*Buffer[T]) Pause

func (s *Buffer[T]) Pause(ctx context.Context) error

func (*Buffer[T]) ReadSlice

func (s *Buffer[T]) ReadSlice(parent context.Context, to []T) (int, error)

func (*Buffer[T]) Resume

func (s *Buffer[T]) Resume(ctx context.Context) error

func (*Buffer[T]) SinkEvents

func (s *Buffer[T]) SinkEvents() *SinkEvents[T]

func (*Buffer[T]) SourceEvents

func (s *Buffer[T]) SourceEvents() *SourceEvents[T]

func (*Buffer[T]) Write

func (s *Buffer[T]) Write(parent context.Context, value T) error

type BufferOpt

type BufferOpt[T any] func(b *Buffer[T])

func WithBufferTracePrefix

func WithBufferTracePrefix[T any](prefix string) BufferOpt[T]

type ChannelPort

type ChannelPort[T any] struct {
	Input    *ChannelSink[T]
	Feedback chan channelPortFeedback
	Output   *ChannelSource[T]
}

func NewChannelPort

func NewChannelPort[T any](size int) *ChannelPort[T]

type ChannelSink

type ChannelSink[T any] struct {
	Push func(ctx context.Context) error
	// contains filtered or unexported fields
}

ChannelSink bridges a Golang `chan` into the bridge interface. Due to the nature of channels not having feedback mechanisms some events will not fire as expected without additional aid.

func NewChannelSink

func NewChannelSink[T any](target chan<- T) *ChannelSink[T]

func (*ChannelSink[T]) ConsumeEvent

func (c *ChannelSink[T]) ConsumeEvent(parent context.Context, e channelPortFeedback) error

func (*ChannelSink[T]) Finish

func (c *ChannelSink[T]) Finish(ctx context.Context) error

func (*ChannelSink[T]) Resume

func (c *ChannelSink[T]) Resume(ctx context.Context) error

func (*ChannelSink[T]) SinkEvents

func (c *ChannelSink[T]) SinkEvents() *SinkEvents[T]

func (*ChannelSink[T]) Write

func (c *ChannelSink[T]) Write(parent context.Context, v T) error

type ChannelSource

type ChannelSource[T any] struct {
	// contains filtered or unexported fields
}

func NewChannelSource

func NewChannelSource[T any](pipe <-chan T) *ChannelSource[T]

func (*ChannelSource[T]) Pause

func (c *ChannelSource[T]) Pause(ctx context.Context) error

func (*ChannelSource[T]) PumpTick

func (c *ChannelSource[T]) PumpTick(parent context.Context) (count int, err error)

func (*ChannelSource[T]) ReadSlice

func (c *ChannelSource[T]) ReadSlice(parent context.Context, to []T) (i int, err error)

func (*ChannelSource[T]) Resume

func (c *ChannelSource[T]) Resume(parent context.Context) error

func (*ChannelSource[T]) SourceEvents

func (c *ChannelSource[T]) SourceEvents() *SourceEvents[T]

func (*ChannelSource[T]) WaitOnEvent

func (c *ChannelSource[T]) WaitOnEvent(ctx context.Context) error

type ConnectedPipe

type ConnectedPipe[E any] struct {
	// contains filtered or unexported fields
}

func Connect

func Connect[E any](ctx context.Context, from Source[E], to Sink[E], opts ...ConnectedPipeOption) (*ConnectedPipe[E], error)

Connect allows events to flow from a source to a sink.

func (*ConnectedPipe[E]) Close

func (c *ConnectedPipe[E]) Close(ctx context.Context) error

type ConnectedPipeOption

type ConnectedPipeOption func(c *connectedPipeConfig) error

func WithSuppressEnd

func WithSuppressEnd() ConnectedPipeOption

func WithTraceAttributes

func WithTraceAttributes(attrs ...attribute.KeyValue) ConnectedPipeOption

func WithTracePrefix

func WithTracePrefix(tracePrefix string) ConnectedPipeOption

type FanOutSink

type FanOutSink[T any] struct {
	// contains filtered or unexported fields
}

func NewFanOutSink

func NewFanOutSink[T any]() *FanOutSink[T]

func (*FanOutSink[T]) Add

func (f *FanOutSink[T]) Add(target Sink[T])

func (*FanOutSink[T]) Finish

func (f *FanOutSink[T]) Finish(ctx context.Context) error

func (*FanOutSink[T]) Resume

func (f *FanOutSink[T]) Resume(ctx context.Context) error

func (*FanOutSink[T]) SinkEvents

func (f *FanOutSink[T]) SinkEvents() *SinkEvents[T]

func (*FanOutSink[T]) Write

func (f *FanOutSink[T]) Write(ctx context.Context, v T) error

type Sink

type Sink[T any] interface {
	//Write offers a given value v to the Sink.  Standard errors are as follows:
	// * Done - If the stream has finished.  No further writes will be consumed
	// * Full - Written element has filled the output buffers and further writes will result in Overflow
	// * Overflow - Element has been rejected as the sink has too many pending elements to be written
	// * Done - Element has been rejected as the sink is draining
	Write(ctx context.Context, v T) error

	// Finish instructs the sink to no longer accept writes, wait for internal buffers to drain, and finish.
	// Source must emit at least the following events:
	// * Finishing - Indicates the sink is no longer accepting new writes
	// * Finished - Indicates the sink has completed flushing the internal buffer
	Finish(ctx context.Context) error

	// SinkEvents provides access to the possible events to be emitted by the sink
	SinkEvents() *SinkEvents[T]

	// Resume instructs the Sink to resume draining internal buffers and accept writes again.
	// Standard errors:
	// * Done - The stream is finishing or finishing and is no longer accepting writes.
	Resume(ctx context.Context) error
}

Sink represents a way to write elements to an abstract destination. Could be a memory buffer, through a channel, a database, etc. As a key counterpart to Source, a Sink provides back pressure semantics to allow buffers to be primed.

type SinkAvailableEvent

type SinkAvailableEvent[T any] struct {
	// Space indicates how many elements can be written before Overflow is provided.  This is advisory as other handlers
	// may have already written data.  Sink.Write is the authoritative source of data for this
	Space int
	// Sink is the originating entity for this event.
	Sink Sink[T]
}

type SinkEvents

type SinkEvents[T any] struct {
	// Writable is dispatched to indicate space is available in a Sink.  This is typically done at the end of any
	// logical grouping or batch which would the sink is drained to.  This allows more efficient pushing.
	//
	// todo: consider merging with drained based on how clients will react.
	Available emitter.Dispatcher[SinkAvailableEvent[T]]

	// Drained is dispatched when the sink is depleted with no additional elements available.
	// todo: consider merging with available based on how clients will react.
	Drained emitter.Dispatcher[Sink[T]]

	// Full is dispatched when the sink is no longer accepting elements.
	Full emitter.Dispatcher[Sink[T]]

	// Finishing is emitted when the stream is waiting for all elements to be flushed but not yet finished.  No further
	// elements will be accepted by the stream
	Finishing emitter.Dispatcher[Sink[T]]

	// Finished indicates the stream has completed flushing its internal buffer and has no further work as a sink to
	// complete.
	Finished emitter.Dispatcher[Sink[T]]
}

SinkEvents are the available events to interact with the sink

type SliceAccumulator

type SliceAccumulator[T any] struct {
	Output []T
	Done   bool
	Events *SinkEvents[T]
}

SliceAccumulator is a Sink for storing T in Output. Once Finish is called Finished event will be emitted.

func NewSliceAccumulator

func NewSliceAccumulator[T any]() *SliceAccumulator[T]

func (*SliceAccumulator[T]) Finish

func (s *SliceAccumulator[T]) Finish(ctx context.Context) error

func (*SliceAccumulator[T]) Resume

func (s *SliceAccumulator[T]) Resume(ctx context.Context) error

func (*SliceAccumulator[T]) SinkEvents

func (s *SliceAccumulator[T]) SinkEvents() *SinkEvents[T]

func (*SliceAccumulator[T]) Write

func (s *SliceAccumulator[T]) Write(ctx context.Context, value T) error

type Source

type Source[T any] interface {
	SourceEvents() *SourceEvents[T]
	// ReadSlice reads up to len(to) elements from the source, returning the count of elements read into the buffer and
	// any problems encountered while reading.  In the case 0 elements are available the source should return
	// UnderRun as the error.
	ReadSlice(ctx context.Context, to []T) (count int, problem error)
	// Resume begins emitting events of T until pushback is received from an event.
	Resume(ctx context.Context) error
	Pause(ctx context.Context) error
}

func FromSlice

func FromSlice[T any](values []T) Source[T]

type SourceEvents

type SourceEvents[T any] struct {
	Data emitter.Dispatcher[T]
	End  emitter.Dispatcher[Source[T]]
}

SourceEvents are the set of events which may be emitted from a source to signal various conditions and states.

type TransformFunc

type TransformFunc[I any, O any] func(ctx context.Context, in I) (O, error)

type Transformer

type Transformer[I any, O any] struct {
	// contains filtered or unexported fields
}

func NewTransform

func NewTransform[I any, O any](fn TransformerFunc[I, O]) *Transformer[I, O]

func (*Transformer[I, O]) Finish

func (t *Transformer[I, O]) Finish(ctx context.Context) error

func (*Transformer[I, O]) Pause

func (t *Transformer[I, O]) Pause(ctx context.Context) error

func (*Transformer[I, O]) Pump deprecated

func (t *Transformer[I, O]) Pump(ctx context.Context, source Source[I], target Sink[O]) error

Pump draws from source, transforms, then pushes to target.

Deprecated: Use Connect as it has better test coverage and less specific

func (*Transformer[I, O]) ReadSlice

func (t *Transformer[I, O]) ReadSlice(ctx context.Context, to []O) (int, error)

func (*Transformer[I, O]) Resume

func (t *Transformer[I, O]) Resume(ctx context.Context) error

func (*Transformer[I, O]) SinkEvents

func (t *Transformer[I, O]) SinkEvents() *SinkEvents[I]

func (*Transformer[I, O]) SourceEvents

func (t *Transformer[I, O]) SourceEvents() *SourceEvents[O]

func (*Transformer[I, O]) Write

func (t *Transformer[I, O]) Write(ctx context.Context, v I) error

type TransformerFunc

type TransformerFunc[I any, O any] func(ctx context.Context, input I) (output O, err error)

type TransformerState

type TransformerState uint8
const (
	TransformerPaused TransformerState = iota
	TransformerFlowing
	TransformerFinished
)

type TransformingSink

type TransformingSink[I any, O any] struct {
	// contains filtered or unexported fields
}

func WrapTransformingSink

func WrapTransformingSink[I any, O any](wrap Sink[O], transformer TransformFunc[I, O]) *TransformingSink[I, O]

func (*TransformingSink[I, O]) Finish

func (t *TransformingSink[I, O]) Finish(ctx context.Context) error

func (*TransformingSink[I, O]) Resume

func (t *TransformingSink[I, O]) Resume(ctx context.Context) error

func (*TransformingSink[I, O]) SinkEvents

func (t *TransformingSink[I, O]) SinkEvents() *SinkEvents[I]

func (*TransformingSink[I, O]) Write

func (t *TransformingSink[I, O]) Write(ctx context.Context, in I) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL