Documentation ¶
Overview ¶
Package reactors provides elements to work with event reactors of various styles. An event reactor is a de-multiplexer ensuring single threaded semantics within the domain of reactor. These are useful in complex interactions between asynchronous components and simplifies message passing.
Index ¶
- Constants
- func InvokeOp(underlying context.Context, reactor Boundary[any], op TickEventFunc) error
- func InvokeStateOp[S any](underlying context.Context, reactor Boundary[S], state S, ...) error
- func ScheduleFunc(ctx context.Context, op TickEventFunc)
- func StreamBetween[E any, I any, O any](ctx context.Context, inputSide Boundary[I], outputSide Boundary[O], ...) (streams.Source[E], streams.Sink[E], error)
- func Submit[I any, O any, R any](ctx context.Context, replyTo Boundary[I], target Boundary[O], ...) *task.Promise[R]
- func VerifyWithinBoundary[S any](ctx context.Context, boundary Boundary[S])
- func WithReactor[S any](underlying context.Context, reactor Boundary[S]) context.Context
- type Boundary
- type Channel
- func (c *Channel[S]) ConsumeAll(ctx context.Context, state S) (int, error)
- func (c *Channel[S]) Done()
- func (c *Channel[S]) ScheduleFunc(ctx context.Context, operation TickEventFunc)
- func (c *Channel[S]) ScheduleStateFunc(ctx context.Context, operation TickEventStateFunc[S])
- func (c *Channel[S]) Tick(ctx context.Context, event ChannelEvent[S], state S) error
- type ChannelEvent
- type StreamBetweenOpt
- type TickEventFunc
- type TickEventStateFunc
- type Ticked
Constants ¶
const ContextKey = "meschbach.junk.reactor"
Variables ¶
This section is empty.
Functions ¶
func InvokeStateOp ¶
func ScheduleFunc ¶
func ScheduleFunc(ctx context.Context, op TickEventFunc)
ScheduleFunc schedules an op on the contextual reactor Boundary.
func StreamBetween ¶
func StreamBetween[E any, I any, O any](ctx context.Context, inputSide Boundary[I], outputSide Boundary[O], opts ...StreamBetweenOpt) (streams.Source[E], streams.Sink[E], error)
StreamBetween allows a stream to traverse between two boundaries in a synchronized manner.
Seems a bit strange to have this generated outside of the source boundary since the stream must be passed in. In practice this should generally be invoked by the coordinating builder common between both sides.
func VerifyWithinBoundary ¶
VerifyWithinBoundary will panic if the given context is invoked with the wrong boundary when using the `sane` build tag. Otherwise this ia no-op
Types ¶
type Boundary ¶
type Boundary[S any] interface { // ScheduleFunc schedules the given operation to be executed within the reactor upon the next tick of the given // reactor. ScheduleFunc(ctx context.Context, operation TickEventFunc) ScheduleStateFunc(ctx context.Context, operation TickEventStateFunc[S]) }
Boundary will de-multiplex multiple execution requests into a single serialized stream.
type Channel ¶
type Channel[S any] struct { // contains filtered or unexported fields }
Channel is a Boundary implementation utilizing a chan as a work queue for execution. When the driving event loop is ready it should receive from the work queue and dispatch using the Tick method.
func NewChannel ¶
func NewChannel[S any](queueSize int) (*Channel[S], <-chan ChannelEvent[S])
NewChannel creates a new reactor with the specified queueSize. It is recommended for queueSize to be greater than 1 to avoid synchronous hand-offs between goroutines or deadlocking.
func RunChannelActor ¶
RunChannelActor will run a new reactor with the given state until the given context is complete.
func (*Channel[S]) ConsumeAll ¶
ConsumeAll will consume all pending messages from the work queue until empty then return. If the invoking context is canceled prior to completion then routine will exit early.
func (*Channel[S]) ScheduleFunc ¶
func (c *Channel[S]) ScheduleFunc(ctx context.Context, operation TickEventFunc)
func (*Channel[S]) ScheduleStateFunc ¶
func (c *Channel[S]) ScheduleStateFunc(ctx context.Context, operation TickEventStateFunc[S])
type ChannelEvent ¶
type ChannelEvent[S any] struct { // contains filtered or unexported fields }
ChannelEvent is an opaque handle for relaying events back through the reactor from a channel receive.
type StreamBetweenOpt ¶
type StreamBetweenOpt func(s *streamBetweenConfig)
func WithStreamBetweenName ¶
func WithStreamBetweenName(name string) StreamBetweenOpt
type TickEventFunc ¶
TickEventFunc is a handler for a tick event within a reactor. Provides no reactor state, so it must be passed in or extracted via other means
type TickEventStateFunc ¶
TickEventStateFunc handles a tick event within a reactor given the reactor state S.
type Ticked ¶
type Ticked[S any] struct { // contains filtered or unexported fields }
Ticked is a Boundary externally driven when calling the Tick method. Events will be queued until it is manually ticked.
func (*Ticked[S]) ScheduleFunc ¶
func (t *Ticked[S]) ScheduleFunc(ctx context.Context, operation TickEventFunc)
func (*Ticked[S]) ScheduleStateFunc ¶
func (t *Ticked[S]) ScheduleStateFunc(ctx context.Context, operation TickEventStateFunc[S])
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Package futures provides a promise mechanism against reactors to resolve when event occur in the future.
|
Package futures provides a promise mechanism against reactors to resolve when event occur in the future. |
Package stitch wraps suture to provide a channel based futures
|
Package stitch wraps suture to provide a channel based futures |