stream

package
v0.0.0-...-caf0cee Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 18, 2024 License: MIT, Unlicense Imports: 10 Imported by: 1

Documentation

Overview

Package stream provides the ability to safely read asynchronous, dynamic application state from Gio layout code.

It does so by providing several constructs:

  • Controllers, which connect the lifecycle of asynchronously generated data streams to the frame lifecycle of a Gio application window. Controllers handle invalidating the window when new data is available on active streams and also shut down streams when they are no longer in use.
  • Streams, which are bound to a particular controller and provide asynchronous state updates while they are in use by visible widgets. When not in use, their state updates shut down to conserve resources.
  • Transformations are functions that can operate on stream channels to easily create data streams from disparate sources of asynchronous information.
  • Inputs provide data from the application GUI that can be safely read within a stream's asynchronous processing.

Additionally, stream provides constructs for supervising persistent or stateful asynchronous operations from your UI:

  • Mutators, which maintain a list of running stateful operations and ensure that they all shut down as cleanly as possible before your application exits.
  • MutationPools, which allow you to manage groups of related stateful operations and prevent duplicate operations from being created at the same time.
  • Mutations, which are a (streamable) handle onto a running stateful operation.

Controllers

Each window using streams needs its own Controller, which can be constructed with NewController. The controller is used to construct streams bound to that window, and has a method (Controller.Sweep) that must be invoked every frame in order to ensure that streams which are not in use go inert.

Typical use looks like this:

func loop(w *app.Window) error {
	// Make a context that lives as long as the window.
	windowCtx, cancel := context.WithCancel(context.Background())
	defer cancel()
	// Make a controller for this window.
	controller := stream.NewController(windowCtx, w.Invalidate)

	var ops op.Ops
	for event := range w.Events() {
		switch event := event.(type) {
		case system.DestroyEvent:
			return event.Err
		case system.FrameEvent:
			gtx := layout.NewContext(&ops, event)

			// Your layout here, passing the controller so that code can instantiate streams with it.
			layoutUI(gtx, controller)

			event.Frame(gtx.Ops)

			// Transition any active unread streams to be inactive.
			controller.Sweep()
		}
	}
	return nil
}

Providing the window's Invalidate function to NewController ensures that the controller can trigger a window invalidation when new data arrives on a stream visible within the window, ensuring that the UI is redrawn automatically.

Note that Controller.Sweep is being invoked every frame.

Streams

A Stream is a restartable asynchronous computation which is automatically started and stopped based on whether it is actively used by the GUI. You can read from a stream to receive the most recent value created by the stream (if any is available yet).

Only the most recent value sent over a stream is important. Application programs should assume that any given element sent may be overridden by values sent after it. As such, you should not write streams to trickle updates over individual elements in a collection, but should instead send entire updated collections over the stream.

You can construct a Stream with New:

// Make a stream that will emit increasing integers every second.
myStream := stream.New(controller, func(ctx context.Context) <-chan int {
	out := make(chan int)
	go func() {
		defer close(out)
		ticker := time.NewTicker(time.Second)
		defer ticker.Stop()
		ticks := 0
		for {
			select {
			case <-ticker.C:
				ticks++
				out <- ticks
			case <- ctx.Done():
				return
			}
		}
	}()
	return out
})

The controller provided to NewStream will be responsible for starting/stopping the stream based on whether it is in use. The Provider function provided is responsible for providing a receive-only channel of values that will close when the provided context is cancelled.

In most real applications, the Provider function passed to NewStream will perform I/O or interface with another goroutine in order to provide correct updates over the stream. See the section on Transformations for more about how to construct real-world provider functions.

Reading a stream is accomplished through one of the Read* methods. Of these, *Stream.Read is the most fundamental, and will be discussed first:

ticks, status := myStream.Read(gtx)
if Status == stream.Waiting {
	// We haven't received a value over the stream yet.
} else {
	// We have a value, so do something with ticks.
}

Reading from a stream never blocks, so it's quite common to read a stream before any value is available yet. In that case, the stream's returned status will be Waiting, and the returned value should be ignored.

If the status is Emitting, a new value/error are coming out of the stream this frame.

If the status is Cached, the returned value is not new, but has been emitted before.

Often your UI will not care about the status at all, but would rather simply work with a default value until the first data is available on the stream. To facilitate this, we have *Stream.ReadDefault:

ticks := myStream.ReadDefault(gtx, 0)

You can rely upon the returned value to either be your supplied default value or the latest value from the stream.

Finally, sometimes you want to synchronize an existing variable or struct field with the latest data from a stream. *Stream.ReadInto accomplishes this:

var ticks int // Assume we declared this elsewhere, perhaps as a field.
myStream.ReadInto(gtx, &ticks, 0)

As you can see, reading from a stream does not require a great deal of code unless your use-case demands special consideration of the status of the stream.

Results

The Result type provides an easy way to send both a value and error across a channel packed into a single type. It isn't meant to be a general-purpose type, but rather to make it easier to propagate errors from the providers of your streams to your UI when you need to. We provide a number of helper transformations and stream types to make working with streams of results easier.

ResultStream accepts a ProviderR instead of a Provider and allows you to read the error from a stream value ergonomically. We can rewrite the above tick stream example to use Result like so:

// Make a stream that will emit increasing integers (or an error) every second.
myResultStream := stream.NewR(controller, func(ctx context.Context) <-chan stream.Result[int] {
	out := make(chan stream.Result[int])
	go func() {
		defer close(out)
		ticker := time.NewTicker(time.Second)
		defer ticker.Stop()
		ticks := 0
		for {
			select {
			case <-ticker.C:
				ticks++
				if ticks %2 == 0 {
					out <- stream.ResultFrom(ticks, nil)
				} else {
					out <- stream.ResultFrom(0, fmt.Errorf("odd number"))
				}
			case <- ctx.Done():
				return
			}
		}
	}()
	return out
})

We can then read from it with:

ticks, status, err := myResultStream.Read(gtx)
if Status == stream.Waiting {
	// We haven't received a value over the stream yet.
} else if err != nil {
	// We have an error.
} else {
	// We have a value, so do something with ticks.
}

We also provide *ResultStream.ReadDefault and *ResultStream.ReadInto for use-cases where the status is less important.

Transformations

Most of the top-level functions in this package are transformations. They make it easier to construct channels of Ts or Result[T]s, combine those channels, and change their element type. They are intended to be used within the sourceProviders passed to New and NewR. Consider the following:

// Simple function to emit an increasing integer each time the provided duratione elapses.
func tickerProvider(ctx context.Context, dur time.Duration) <-chan int {
	out := make(chan int)
	go func() {
		defer close(out)
		ticker := time.NewTicker(time.Second)
		defer ticker.Stop()
		ticks := 0
		for {
			select {
			case <-ticker.C:
				ticks++
				out <- ticks
			case <- ctx.Done():
				return
			}
		}
	}()
	return out
}

// Make a stream that will emit the product of two streams of integers as a floating-point
// value.
myStream := stream.New(controller, func(ctx context.Context) <-chan float64 {
	everySecond := tickerProvider(ctx, time.Second)
	everyFewSeconds := tickerProvider(ctx, time.Second*3)
	products := stream.Zip(everySecond, everyFewSeconds, func(a, b int) int {
		return a*b
	})
	asFloats := stream.Transform(products, func(a int) float64 {
		return float64(a)
	}
	return asFloats
})

As you can see, these functions make it relatively easy to leverage multiple [Provider]s and [ProviderR]s and combine their results. The above could have converted the output type to be a float64 within the Zip call, but uses the long way to demonstrate how to use Transform. Note that each transformation employed in constructing a stream will add some latency before the UI receives new values, so try not to let your transformation pipeline get too deep.

Transformations with an R suffix work with streams of Result elements and provide extra behaviors to make error handling easier.

Inputs

If your user interface needs to perform a complex filter operation (too expensive to do directly within the UI), you can use an Input to push the filter criteria into your sourceProvider. Construct an Input with its initial value using NewInput.

You can then reference that input within a Provider function for a stream:

// Make a stream that will emit the product of the counter input and a value that increments
// each second.
count := NewInput(5)
myStream := stream.New(controller, func(ctx context.Context) <-chan int {
	everySecond := tickerProvider(ctx, time.Second)
	return stream.Zip(everySecond, count.Stream(ctx), func(a, b int) int {
		return a*b
	})
})

As the user interacts with your UI, you can update the value of your input with Input.Send:

count.Send(10)

This method will not block unless you are performing concurrent sends, and will enable your sourceProvider to consume whatever value you send. Much like streams, only the most recently-sent value is guaranteed to be consumed by the stream.

Mutators

A Mutator is created at the start of an application to supervise stateful operations. Creating one looks like this:

// Make a context that will last the lifetime of your application.
appCtx, cancel := context.WithCancel(context.Background())
defer cancel()
mutator := stream.NewMutator(appCtx, time.Second*5)

// Run your application here, blocking until you want to end the app.

// Block until running mutations end.
mutator.Shutdown()

The mutator will wait up to the duration provided in its constructor for all running mutations to end. After that it will cancel any mutation contexts that are not already cancelled and wait up to the provided duration again. Then it will unblock and allow the application process to exit.

MutationPools

A MutationPool wraps a Mutator and provides the ability to launch stateful operations associated with a unique "key". This key mechanism can be used to ensure that two logically-identical mutations are not created at the same time for applications that need that.

A mutation pool is created like so:

// Assuming you already have a mutator.
var mutator *Mutator
// Create a pool using ints as the type of the unique key and time.Time's as the value type
// emitted by the supervised mutations.
tickerPool := stream.NewMutationPool[int,time.Time](mutator)

You can use any comparable type as a key, and any type as the mutation value type. Usually applications will want to encode identifiers for what a mutation is acting upon and how into the key. The value type should be whatever the result of the mutation is.

You can stream running mutations from the UI with:

// Assuming the MutationPool above and a stream controller.
runningTickersStream := stream.New(controller, tickerPool.Stream)

running, status := runningTickersStream.Read(gtx)

Mutations

A Mutation is a stateful process running asynchronously. You can use them for almost anything, from modifying a database to tracking the progress of a complex user flow through your application. The important difference between a mutation and a normal stream is that a mutation is *not* cancelled when the UI element that launched it stops being drawn. Mutations persist until they complete or the application shuts down (at which point the supervising Mutator will try to ensure that mutations have a chance to complete before killing them).

Mutations can take several forms:

  • Processes that make a change to application state (like a database query or API request) and then exit.
  • Processes that manage a multi-step sequence of operations (like guiding a user through a tour of an application across many different pages) and then exit when done.
  • Processes that run providing useful services for the lifetime of your application (a mutation can maintain a frequently-needed, frequently-updated value for easy access by your UI).

The only requirement for a mutation (encapsulated by the MutationProvider type) is that it eventually closes its output channel (signalling that it is complete). Mutations must terminate themselves as quickly as possible (closing their output channel) when their context is cancelled.

To launch a mutation, you need a MutationPool to host it, and the mutation must result in a sequence of values matching the MutationPool's value type. To reuse the pool from the previous example blocks, we need a mutation that produces a sequence of time.Time values.

key := 0
mutation, isNew := stream.Mutate(tickerPool, key, func(ctx context.Context) <-chan time.Time {
	out := make(chan time.Time)
	go func() {
		defer close(out)
		ticker := time.NewTicker(time.Second)
		defer ticker.Stop()
		for {
			select {
			case <-ctx.Done():
				return
			case t := <-ticker.C:
				out <- t
			}
		}
	}()
	return out
})

The above creates a mutation that emits times from a persistent ticker. That ticker will only stop when the mutation's context is cancelled. The returned mutation value is a handle on this running mutation. It can be used to stream values from the output channel of a mutation. The isNew return value tells the caller whether the mutation pool has returned an already-running mutation for the same key, or has created a new mutation for that key.

You can read the results of a mutation from the UI by creating a stream:

tickStream := stream.New(controller, mutation.Stream)

tick, status := tickStream.Read(gtx)

Like any other stream, you can apply transformation functions within a MutationProvider to modify the output of the mutation for *all* consumers or you can apply transformation functions in a Provider to modify the output for just the current stream.

Index

Constants

This section is empty.

Variables

View Source
var ErrNilController = fmt.Errorf("stream has nil controller")

ErrNilController indicates that an improperly-constructed stream was read. This always indicates a bug constructing the stream, usually making a stream from a struct literal instead of using the constructor function.

View Source
var ErrNilProvider = fmt.Errorf("stream has nil provider function")

ErrNilProvider indicates that an improperly-constructed stream was read. This always indicates a bug constructing the stream, usually making a stream from a struct literal instead of using the constructor function.

Functions

func Debounce

func Debounce[T any](input <-chan T, interval time.Duration) <-chan T

Debounce coalesces values emitted on the input channel so that only one value is emitted each interval. If it is not waiting after emitting a recent value, it will immediately emit a new value, but will collect values emitted within interval time after that and will only emit the final value.

func DebounceUntilStable

func DebounceUntilStable[T any](input <-chan T, interval time.Duration) <-chan T

DebounceUntilStable emits a value one `interval` after the last received input. If the inputs are rapid, the timer will reset on each input until the inputs settle enough for interval to pass between them.

func Distinct

func Distinct[T comparable](input <-chan T) <-chan T

Distinct is DistinctFunc using "a == b" as the same func.

func DistinctFunc

func DistinctFunc[T any](input <-chan T, same func(a, b T) bool) <-chan T

DistinctFunc returns a channel that will emit only data elements that are not the same as the last emitted data element, with "sameness" defined by the provided function.

func DistinctFuncR

func DistinctFuncR[T any](input <-chan Result[T], same func(a, b T, aErr, bErr error) bool) <-chan Result[T]

DistinctFuncR returns a channel that will emit only data elements that are not the same as the last emitted data element, with "sameness" defined by the provided function.

func DistinctR

func DistinctR[T comparable](input <-chan Result[T]) <-chan Result[T]

DistinctR is DistinctFuncR using "a == b && errors.Is(aErr, bErr)" as the same func.

func Filter

func Filter[T, U any](in <-chan T, keep func(T) (U, bool)) <-chan U

Filter selectively drops elements from the input channel, opting to send nothing on the output. If the keep function returns true, the returned U will be emitted. Otherwise it will be dropped.

func FilterR

func FilterR[T, U any](in <-chan Result[T], keep func(T, error) (U, error, bool)) <-chan Result[U]

FilterR implements Filter for Result channels, automatically unpacking the value/error of the input and repacking the U and error returned by keep into a Result.

func Latest

func Latest[T any](in <-chan T) <-chan T

Latest creates a buffered channel that will exhibit no backpressure. While waiting to send its next output element, it will always be ready to receive a new input, and it will replace its next outbound element with any new element. This is useful when you want to skip doing the work for intermediate values when multiple values are flowing through a stream pipeline.

func Multiplex

func Multiplex[In, Out, State any](
	input <-chan In,
	choose func(ctx context.Context, state State, val In) (<-chan Out, State),
) <-chan Out

Multiplex allows dynamically reconfiguring its output stream based on the last value of its input stream. The provided choose function is given the latest results from the input channel, and is responsible for choosing whether or not change the output stream. If the choose function returns a non-nil channel, the MultiplexR output channel will emit values read from that channel until the next input value arrives. Note that choose must return a non-nil stream at least once in order for any output values to ever be emitted. The ctx passed to choose is a new context that can be used to construct a new stream channel from a provider function. When choose returns a non-nil channel, any previously- created channel will have its context cancelled, ensuring no leak of goroutines. The State type allows the choose function to implement stateful operations, like ensuring that it isn't about to return a new copy of the same stream channel it emitted on its previous invocation. The choose function accepts and returns a variable of type State, and the retuned state will always be passed back to the next invocation of choose (even if the choose func returns a nil channel, the returned state will be passed to the next call to choose).

func MultiplexR

func MultiplexR[In, Out, State any](
	input <-chan Result[In],
	choose func(ctx context.Context, state State, val In, err error) (<-chan Result[Out], State),
) <-chan Result[Out]

MultiplexR allows dynamically reconfiguring its output stream based on the last value of its input stream. The provide choose function is given the latest results from the input channel, and is responsible for choosing whether or not change the output stream. If the choose function returns a non-nil channel, the MultiplexR output channel will emit values read from that channel until the next input value arrives. Note that choose must return a non-nil stream at least once in order for any output values to ever be emitted. The ctx passed to choose is a new context that can be used to construct a new stream channel from a provider function. When choose returns a non-nil channel, any previously- created channel will have its context cancelled, ensuring no leak of goroutines. The State type allows the choose function to implement stateful operations, like ensuring that it isn't about to return a new copy of the same stream channel it emitted on its previous invocation. The choose function accepts and returns a variable of type State, and the retuned state will always be passed back to the next invocation of choose (even if the choose func returns a nil channel, the returned state will be passed to the next call to choose).

func PostProcess

func PostProcess[T any](input <-chan T, f func(t T) T) <-chan T

PostProcess immediately emits any value received from input, but asynchronously invokes f on any values received and emits the resulting data afterward. This can be used to perform a time-consuming post-processing step while also getting raw data through the entire stream pipeline with as low of latency as possible. This operation only makes sense for data that can be used in its raw form as well as its processed form, like images.

func PostProcessR

func PostProcessR[T any](input <-chan Result[T], f func(t T, err error) (T, error)) <-chan Result[T]

PostProcessR immediately emits any value received from input, but asynchronously invokes f on any values received and emits the resulting data afterward. This can be used to perform a time-consuming post-processing step while also getting raw data through the entire stream pipeline with as low of latency as possible. This operation only makes sense for data that can be used in its raw form as well as its processed form, like images.

func Transform

func Transform[T, U any](input <-chan T, transformer func(T) U) <-chan U

Transform returns a new channel which will emit every value sent over input transformed by the transformer function.

func TransformR

func TransformR[T, U any](input <-chan Result[T], transformer func(T, error) (U, error)) <-chan Result[U]

TransformR returns a new channel which will emit every value sent over input transformed by the transformer function.

func Zip

func Zip[T, U, Out any](a <-chan T, b <-chan U, zipFunc func(a T, b U) Out) <-chan Out

Zip combines two streams of types T and U into a stream of type Out. Zip works with the latest value from each stream (discarding old values as new ones arrive), and will not emit a value until each input stream has emitted at least one value. Each time the input streams emit values, the provided zipFunc will be invoked to synthesize an output value. The output channel will close when either input channel closes.

func ZipR

func ZipR[T, U, Out any](a <-chan Result[T], b <-chan Result[U], zipFunc func(a T, b U, aErr, bErr error) (Out, error)) <-chan Result[Out]

ZipR combines two streams of types T and U into a stream of type Out. ZipR works with the latest value from each stream (discarding old values as new ones arrive), and will not emit a value until each input stream has emitted at least one value. Each time the input streams emit values, the provided zipFunc will be invoked to synthesize an output value. The output channel will close when either input channel closes.

func ZipWrapR

func ZipWrapR[T, U, Out any](a <-chan Result[T], b <-chan Result[U], zipFunc func(a T, b U) (Out, error)) <-chan Result[Out]

ZipWrapR behaves as ZipR except that it handles combining errors from channels by wrapping them together and will not invoke the zipFunc if either channel errored (instead emitting the error result).

  • If a errored and b did not, set a's error as the output error
  • If b errored and a did not, set b's error as the output error
  • If both a and b errored, set an error wrapping both as the output error
  • If there is an output error, return the zero value of Out and that error.
  • Otherwise, run zipFunc and return its results.

Types

type ARCPool

type ARCPool[V PoolKeyer[K], K comparable, T any] struct {
	// contains filtered or unexported fields
}

ARCPool manages a pool of [PersistentStream]s that are created and destroyed automatically by refcounting the number of consumers of each stream.

func NewARCPool

func NewARCPool[V PoolKeyer[K], K comparable, T any](mutator *Mutator, provider func(ctx context.Context, v V) <-chan T) *ARCPool[V, K, T]

NewARCPool defines a new ARCPool with a provider function that is responsible for actually doing the work of the PersistentStream used for each key. The provider func should be safe to cancel at any time, so it should ideally have no side effects on other application state.

func (*ARCPool[V, K, T]) Stream

func (a *ARCPool[V, K, T]) Stream(ctx context.Context, v V) <-chan T

Stream either starts streaming for a given V or returns the existing channel streaming its output. When all active streams for a given V are cancelled, the underlying mutation is also cancelled.

type ComparableKeyer

type ComparableKeyer[K comparable] struct {
	// contains filtered or unexported fields
}

ComparableKeyer wraps simple comparable types so that they implement PoolKeyer.

func Comparable

func Comparable[K comparable](k K) ComparableKeyer[K]

Comparable wraps the given k as a ComparableKeyer.

func (ComparableKeyer[K]) MutationPoolKey

func (c ComparableKeyer[K]) MutationPoolKey() K

MutationPoolKey returns the comparable key for this ComparableKeyer.

type Controller

type Controller struct {
	// contains filtered or unexported fields
}

Controller manages the lifecycle of asynchronous streams of data, connecting them to the frame event loop of a given application window.

func NewController

func NewController(ctx context.Context, invalidator func()) *Controller

NewController constructs a controller bound to the window lifecycle of a single application window. The provided invalidator func must trigger an invalidation of that window, and the Sweep() method must be invoked during the processing of each frame for that window. A controller can be shared among all async loading for a single window.

func (*Controller) Done

func (s *Controller) Done()

Done cleans up all streams. It should be invoked when an application window is closed in order to ensure that all associated processing shuts down with the window.

func (*Controller) Sweep

func (s *Controller) Sweep() (active, swept int)

Sweep cleans up inactive streams. It must be invoked once per frame by the event loop for the window that the stream is bound to. It returns the number of active streams after the sweep, as well as the number of streams that were swept away by the call (making them inert). Note that this is not the same as the number of inert streams.

type Input

type Input[T any] struct {
	// contains filtered or unexported fields
}

Input provides data to a stream in a threadsafe way and without blocking (unless you perform concurrent sends, then it may block). It can be used to feed data into a stream data transformation pipeline from the UI event loop.

func NewDistinctInput

func NewDistinctInput[T comparable](initialValue T) Input[T]

NewInput constructs an input like NewInput, but the input will not emit values that are the same as the most-recently-emitted values. In this case, sameness is defined as having equal values (==).

func NewInput

func NewInput[T any](initialValue T) Input[T]

NewInput constructs an input, setting it to emit initialValue the first time it is read.

func NewInputEmpty

func NewInputEmpty[T any](filter func(T, T) bool) Input[T]

NewInputEmpty creates a new input with no initial value. Applications may supply a filter function that will drop sending elements if it returns true.

func (*Input[T]) Send

func (s *Input[T]) Send(t T)

Send emits t on the input, replacing any previously-emitted value that has not already been consumed.

func (*Input[T]) Stream

func (s *Input[T]) Stream(ctx context.Context) <-chan T

Stream returns a channel upon which the Input's data is available. The channel will close when ctx is cancelled. It is safe to call Stream many times on the same input, allowing it to feed into multiple processing pipelines.

type Mutation

type Mutation[T any] struct {
	// contains filtered or unexported fields
}

Mutation is a handle on the results of an asynchronous application state change.

func Mutate

func Mutate[K comparable, T any](pool *MutationPool[K, T], key K, mutationProv MutationProvider[T]) (mut *Mutation[T], isNew bool)

Mutate attempts to start a mutation using the mutationProv and bound to the unique key. If key is already registered to a running mutation, that mutation instance will be returned instead.

If the returned mutation is nil, no mutations can be started because the mutator is shut down. If the returned mutation is non-nil, the isNew return value indicates whether it is a newly-spawned mutation, or a reference to an already-running mutation for the same key.

func MutateKeyed

func MutateKeyed[V PoolKeyer[K], K comparable, T any](pool *MutationPool[K, T], v V, mutationProv MutationProvider[T]) (mut *Mutation[T], isNew bool)

MutateKeyed is identical to Mutate except that it automatically derives the mutation key from a provided PoolKeyer instead of requiring the key to be passed explicitly. This can sometimes be more ergonomic.

func MutateTimeout

func MutateTimeout[K comparable, T any](pool *MutationPool[K, T], key K, timeout time.Duration, mutationProv MutationProvider[T]) (mut *Mutation[T], isNew bool)

MutateTimeout does the same thing as Mutate, but sets a timeout on the mutations' context.

func PersistentStream

func PersistentStream[K comparable, T any](pool *MutationPool[K, T], key K, prov Provider[T]) (mut *Mutation[T], isNew bool)

PersistentStream attempts to start a stream with its lifecycle bound to a mutation pool instead of the UI. This is useful when many UI streams want to consume the same data, as they can all consume the output of a single persistent stream (via the returned mutation's Stream method) instead of each independently querying or generating the source data of interest. If key is already registered to a running mutation, that mutation instance will be returned instead.

NOTE(whereswaldon): the existence of this method points to mutations being misnamed. They are really a kind of stream that has a different lifecycle and are allowed to have side effects within the application. This method provides a way to access the different lifecycle while committing to not have side effects. It's unclear what a better name would be though.

If the returned mutation is nil, no mutations can be started because the mutator is shut down. If the returned mutation is non-nil, the isNew return value indicates whether it is a newly-spawned mutation, or a reference to an already-running mutation for the same key.

func (*Mutation[T]) Cancel

func (m *Mutation[T]) Cancel()

Cancel terminates the mutation. It is safe to call cancel on a nil mutation. It will do nothing.

func (*Mutation[T]) Stream

func (m *Mutation[T]) Stream(ctx context.Context) <-chan T

Stream is a Provider that emits values from the mutation's output channel. It is safe to call Stream on a nil mutation. It will return a channel that will close when the provided context is cancelled.

type MutationPool

type MutationPool[K comparable, T any] struct {
	// contains filtered or unexported fields
}

MutationPool supervises the lifecycle of a group of related mutations. Specifically, it prevents two mutations with the same key from coexisting, and it offers a method to stream all running mutations.

func NewMutationPool

func NewMutationPool[K comparable, T any](mutator *Mutator) *MutationPool[K, T]

NewMutationPool creates a MutationPool powered by the provided mutator.

func (*MutationPool[K, T]) Stream

func (m *MutationPool[K, T]) Stream(ctx context.Context) <-chan map[K]*Mutation[T]

Stream returns a channel on which all running mutations will be emitted until the ctx is cancelled.

type MutationProvider

type MutationProvider[T any] func(ctx context.Context) (values <-chan T)

MutationProvider starts an asynchronous application state change and returns a channel of results from that process. The provided context is not bound to the lifecycle of a specific stream, but to the mutation itself. If the context is cancelled, the mutation provider should stop the mutation and emit values on its output channel indicating where it stopped. When the mutation is complete (whether a result of running to completion or cancellation), the returned channel must be closed.

type Mutator

type Mutator struct {
	// contains filtered or unexported fields
}

Mutator manages the lifecycle of asynchronous application state mutations.

func NewMutator

func NewMutator(ctx context.Context, timeout time.Duration) *Mutator

NewMutator creates a new mutator which will spawn all mutations using the provided context. The provided timeout configures how long Mutator.Shutdown will wait before giving up on a clean shutdown, see the docs on that method for details.

func (*Mutator) Shutdown

func (m *Mutator) Shutdown() error

Shutdown stops the creation of new mutations (causing [Mutator.Exec] to return false) and blocks until all existing mutations complete. If the timeout provided to NewMutator elapses without all mutations shutting down, all mutation contexts will be cancelled. Then Shutdown will again wait up to the configured timeout for all mutations to end cleanly. If some mutations are still not shutdown after the second timeout, Shutdown will return an error. Otherwise (in the case of clean shutdown) it will return nil.

type PoolKeyer

type PoolKeyer[K comparable] interface {
	// MutationPoolKey returns a key that identifies the work performed by a mutation for deduplication
	// purposes. If two mutations have the same key, [MutationPool]s will deduplicate them so the only
	// one can execute at a time.
	MutationPoolKey() K
}

PoolKeyer describes a type that can generate a unique mutation pool key for itself.

type Provider

type Provider[T any] func(context.Context) <-chan T

Provider is a function that returns a channel of values which will be closed when the provided context is cancelled. Provider functions are usually used to provide data as input to a stream's asynchronous processing. Implementations may close the output channel early to indicate that the most recently returned value is the final value for the stream.

type ProviderR

type ProviderR[T any] func(context.Context) <-chan Result[T]

ProviderR is a function that returns a channel of [Result]s which will be closed when the provided context is cancelled. ProviderR functions are usually used to provide data and errors as input to a stream's asynchronous processing. Implementations may close the output channel early to indicate that the most recently returned value is the final value for the stream.

type Result

type Result[T any] struct {
	Err error
	V   T
}

Result is a convenience type for bundling data and an error together so they can be sent over a channel.

func ResultFrom

func ResultFrom[T any](t T, e error) Result[T]

ResultFrom constructs a Result by bundling the given t and e together.

func (Result[T]) Split

func (r Result[T]) Split() (T, error)

Split unpacks the result's value and error, returning them separately.

type ResultStream

type ResultStream[T any] Stream[Result[T]]

ResultStream provides Result[T]s from asynchronous logic. It can be read safely without blocking, and must be read each frame in order to stay active. If it goes inactive, reading it again will reactivate it automatically.

func NewR

func NewR[T any](controller *Controller, provider ProviderR[T]) *ResultStream[T]

NewR creates a stream using the provided controller and provider. The provider will only be invoked when activating or reactivating the stream, not each time the stream is read. The channel returned from the provider must not close until the context is cancelled.

func (*ResultStream[T]) Read

func (s *ResultStream[T]) Read(gtx layout.Context) (value T, status Status, err error)

Read a value from the stream, if any. The returned status indicates whether the returned value and err have a meaningful value.

func (*ResultStream[T]) ReadDefault

func (s *ResultStream[T]) ReadDefault(gtx layout.Context, t T) (T, error)

ReadDefault reads from the stream, returning the provided default value if no value is available yet on the stream.

func (*ResultStream[T]) ReadInto

func (s *ResultStream[T]) ReadInto(gtx layout.Context, t *T, def T) error

ReadInto reads from the stream and (if any value is available) assigns the latest value to t. If no value is available, def is assigned to t. The returned error is the error result of the latest stream value or nil.

func (*ResultStream[T]) ReadNew

func (s *ResultStream[T]) ReadNew(gtx layout.Context) (value T, isNew bool, err error)

ReadNew returns the current value available from the stream (which may be the zero value if no value has ever been received) and a boolean indicating whether that value is newly emitted during the current frame. This function serves to shorten the common idiom:

if value, status, err := s.Read(gtx); status == stream.Emitting {
	// Do logic that should only occur when a value is emitted from the stream.
}

It can be written as:

if value, ok, err := s.ReadNew(gtx); ok {
	// Do logic that should only occur when a value is emitted from the stream.
}

type Source

type Source[S, T any] struct {
	// contains filtered or unexported fields
}

Source helps contruct [Provider]s that emit a single, shared value. It may not be suitable for all applications. Streams created using a Source will receive the latest available value, but may not receive every value emitted by the valuer if new values arrive quickly. S is an internal state type, protected by the source's lock, that is used to track the current state of the source. T is the result type emitted on streams from this source. If constructed with NewSourceCtx, a Source can be "closed" much like a channel by cancelling its input context.

func NewSource

func NewSource[S, T any](valuer func(S) (T, bool)) *Source[S, T]

NewSource constructs a source using the provided valuer function to transform its current state (S) into a T. valuer must be idempotent. The boolean return value from valuer indicates whether the T should be emitted over the stream or discarded. The valuer should deep copy all data it uses in T to ensure that other invocations of valuer do not reference the same memory.

func NewSourceCtx

func NewSourceCtx[S, T any](ctx context.Context, valuer func(S) (T, bool)) *Source[S, T]

NewSourceCtx constructs a new source that will close all output streams when the provided context is cancelled. Calls to *Source.Update after the provided context is cancelled will have no effect.

func (*Source[S, T]) Stream

func (s *Source[S, T]) Stream(ctx context.Context) <-chan T

Stream is a Provider function. The returned channel will close when the provided context is cancelled or (if the Source was constructed with NewSourceCtx) when the source's context is cancelled, and will emit any values set by Update() for which the valuer function provided at construction returns true. If Update() is invoked quickly, only the final value is guaranteed to be emitted on the channel returned by Stream.

func (*Source[S, T]) Update

func (s *Source[S, T]) Update(fn func(oldState S) S)

Update runs fn with the source's lock held, passing the current state to fn and setting the state to the return value of fn. If this Source was constructed with NewSourceCtx and the context has been cancelled, calls to Update will have no effect.

func (*Source[S, T]) UpdateIf

func (s *Source[S, T]) UpdateIf(fn func(oldState S) (S, bool))

Update runs fn with the source's lock held, passing the current state to fn and setting the state to the return value of fn iff fn returns true. If fn returns false, the returned value is discarded and no value will be sent on the stream's output. If this Source was constructed with NewSourceCtx and the context has been cancelled, calls to UpdateIf will have no effect.

type Status

type Status uint8

Status describes the state of the data read from the stream.

const (
	// Waiting indicates that the stream has never received a value.
	Waiting Status = iota
	// Emitting indicates that the stream is emitting a new value.
	Emitting
	// Cached indicates that the stream is emitting a cached copy of the most
	// recently received value.
	Cached
	// Complete indicates that the stream provider closed its output channel after
	// emitting at least one value and without being cancelled. This indicates that
	// the work for this stream is complete and the stream need not be restarted.
	// The last value received over the channel will always be returned with this
	// status.
	Complete
	// Incomplete indicates that the stream provider closed its output channel without
	// ever emitting a value. This is usually a bug in the stream provider. Any stream
	// value received with this status should be ignored.
	Incomplete
	// Uninitialized means that the stream has never been constructed. This status is
	// only returned if a nil stream is read.
	Uninitialized
)

func (Status) String

func (s Status) String() string

type Stream

type Stream[T any] struct {
	// contains filtered or unexported fields
}

Stream provides Ts from asynchronous logic. It can be read safely without blocking, and must be read each frame in order to stay active. If it goes inactive, reading it again will reactivate it automatically. A nil stream can be read from safely, but it is invalid to construct a stream literal without invoking a constructor function. Such streams will panic when used.

func New

func New[T any](controller *Controller, provider Provider[T]) *Stream[T]

New creates a stream using the provided controller and provider. The provider will only be invoked when activating or reactivating the stream, not each time the stream is read. The channel returned from the provider must not close until the context is cancelled.

func Once

func Once[T any](controller *Controller, do func(ctx context.Context) T) *Stream[T]

Once creates a stream that will only emit a single value, and will not restart itself if it completes sucessfully. This is useful for one-shot async computations.

func (*Stream[T]) Read

func (s *Stream[T]) Read(gtx layout.Context) (value T, status Status)

Read a value from the stream, if any. The returned status indicates whether the returned value and err have a meaningful value.

func (*Stream[T]) ReadDefault

func (s *Stream[T]) ReadDefault(gtx layout.Context, t T) T

ReadDefault reads from the stream, returning the provided default value if no value is available yet on the stream.

func (*Stream[T]) ReadInto

func (s *Stream[T]) ReadInto(gtx layout.Context, t *T, def T)

ReadInto reads from the stream and (if any value is available) assigns the latest value to t. If no value is available, def is assigned to t.

func (*Stream[T]) ReadNew

func (s *Stream[T]) ReadNew(gtx layout.Context) (value T, isEmitting bool)

ReadNew returns the current value available from the stream (which may be the zero value if no value has ever been received) and a boolean indicating whether that value is newly emitted during the current frame. This function serves to shorten the common idiom:

if value, status := s.Read(gtx); status == stream.Emitting {
    // Do logic that should only occur when a value is emitted from the stream.
}

It can be written as:

if value, ok := s.ReadNew(gtx); ok {
    // Do logic that should only occur when a value is emitted from the stream.
}

Many streams do not need special handling for when events are emitted, and should not use this method.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL