reactors

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 28, 2024 License: MIT Imports: 8 Imported by: 2

Documentation

Overview

Package reactors provides elements to work with event reactors of various styles. An event reactor is a de-multiplexer ensuring single threaded semantics within the domain of reactor. These are useful in complex interactions between asynchronous components and simplifies message passing.

Index

Constants

View Source
const ContextKey = "meschbach.junk.reactor"

Variables

This section is empty.

Functions

func InvokeOp

func InvokeOp(underlying context.Context, reactor Boundary[any], op TickEventFunc) error

InvokeOp invokes a given op within the context of the reactor.

func InvokeStateOp

func InvokeStateOp[S any](underlying context.Context, reactor Boundary[S], state S, op TickEventStateFunc[S]) error

func ScheduleFunc

func ScheduleFunc(ctx context.Context, op TickEventFunc)

ScheduleFunc schedules an op on the contextual reactor Boundary.

func StreamBetween

func StreamBetween[E any, I any, O any](ctx context.Context, inputSide Boundary[I], outputSide Boundary[O], opts ...StreamBetweenOpt) (streams.Source[E], streams.Sink[E], error)

StreamBetween allows a stream to traverse between two boundaries in a synchronized manner.

Seems a bit strange to have this generated outside of the source boundary since the stream must be passed in. In practice this should generally be invoked by the coordinating builder common between both sides.

func Submit

func Submit[I any, O any, R any](ctx context.Context, replyTo Boundary[I], target Boundary[O], apply func(boundaryContext context.Context, state O) (R, error)) *task.Promise[R]

func VerifyWithinBoundary

func VerifyWithinBoundary[S any](ctx context.Context, boundary Boundary[S])

VerifyWithinBoundary will panic if the given context is invoked with the wrong boundary when using the `sane` build tag. Otherwise this ia no-op

func WithReactor

func WithReactor[S any](underlying context.Context, reactor Boundary[S]) context.Context

WithReactor creates a child context with the specified reactor for contextual use

Types

type Boundary

type Boundary[S any] interface {
	// ScheduleFunc schedules the given operation to be executed within the reactor upon the next tick of the given
	// reactor.
	ScheduleFunc(ctx context.Context, operation TickEventFunc)
	ScheduleStateFunc(ctx context.Context, operation TickEventStateFunc[S])
}

Boundary will de-multiplex multiple execution requests into a single serialized stream.

func For

func For[S any](ctx context.Context) Boundary[S]

For retrieves a reactor Boundary from the given context

func Maybe

func Maybe[S any](ctx context.Context) (boundary Boundary[S], has bool)

type Channel

type Channel[S any] struct {
	// contains filtered or unexported fields
}

Channel is a Boundary implementation utilizing a chan as a work queue for execution. When the driving event loop is ready it should receive from the work queue and dispatch using the Tick method.

func NewChannel

func NewChannel[S any](queueSize int) (*Channel[S], <-chan ChannelEvent[S])

NewChannel creates a new reactor with the specified queueSize. It is recommended for queueSize to be greater than 1 to avoid synchronous hand-offs between goroutines or deadlocking.

func RunChannelActor

func RunChannelActor[E any](ctx context.Context, state E) *Channel[E]

RunChannelActor will run a new reactor with the given state until the given context is complete.

func (*Channel[S]) ConsumeAll

func (c *Channel[S]) ConsumeAll(ctx context.Context, state S) (int, error)

ConsumeAll will consume all pending messages from the work queue until empty then return. If the invoking context is canceled prior to completion then routine will exit early.

func (*Channel[S]) Done

func (c *Channel[S]) Done()

func (*Channel[S]) ScheduleFunc

func (c *Channel[S]) ScheduleFunc(ctx context.Context, operation TickEventFunc)

func (*Channel[S]) ScheduleStateFunc

func (c *Channel[S]) ScheduleStateFunc(ctx context.Context, operation TickEventStateFunc[S])

func (*Channel[S]) Tick

func (c *Channel[S]) Tick(ctx context.Context, event ChannelEvent[S], state S) error

Tick will dispatch the requested event within the context of this reactor.

type ChannelEvent

type ChannelEvent[S any] struct {
	// contains filtered or unexported fields
}

ChannelEvent is an opaque handle for relaying events back through the reactor from a channel receive.

type StreamBetweenOpt

type StreamBetweenOpt func(s *streamBetweenConfig)

func WithStreamBetweenName

func WithStreamBetweenName(name string) StreamBetweenOpt

type TickEventFunc

type TickEventFunc func(ctx context.Context) error

TickEventFunc is a handler for a tick event within a reactor. Provides no reactor state, so it must be passed in or extracted via other means

type TickEventStateFunc

type TickEventStateFunc[S any] func(ctx context.Context, state S) error

TickEventStateFunc handles a tick event within a reactor given the reactor state S.

type Ticked

type Ticked[S any] struct {
	// contains filtered or unexported fields
}

Ticked is a Boundary externally driven when calling the Tick method. Events will be queued until it is manually ticked.

func (*Ticked[S]) ScheduleFunc

func (t *Ticked[S]) ScheduleFunc(ctx context.Context, operation TickEventFunc)

func (*Ticked[S]) ScheduleStateFunc

func (t *Ticked[S]) ScheduleStateFunc(ctx context.Context, operation TickEventStateFunc[S])

func (*Ticked[S]) Tick

func (t *Ticked[S]) Tick(ctx context.Context, maximum int, state S) (hasMore bool, err error)

Tick executes up to the maximum number of event reductions within the reactor.

Directories

Path Synopsis
Package futures provides a promise mechanism against reactors to resolve when event occur in the future.
Package futures provides a promise mechanism against reactors to resolve when event occur in the future.
Package stitch wraps suture to provide a channel based futures
Package stitch wraps suture to provide a channel based futures

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL