bigbuff

package module
v1.15.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 30, 2022 License: Apache-2.0 Imports: 8 Imported by: 1

README

go-bigbuff

test coverage: 97.1%

Package bigbuff implements many useful concurrency primitives and utilities. It was originally created around bigbuff.Buffer, a one-to-many unbounded FIFO queue with a built in retention policy mechanism, and two interfaces bigbuff.Consumer, and bigbuff.Producer, which generalise the pattern in a way that supports complex use cases such as at-least-once semantics. Over time this package has collected a range of different, often highly specialised implementations, that deal with different aspects of the complicated challenge of concurrent programming.

Godoc here: github.com/joeycumines/go-bigbuff

Specialised tools should be carefully considered against possible use cases, and those provided by this package are no different. A good rule of thumb is to keep them peripheral and replaceable, where possible. Choose wisely ;).

Highlights

  • bigbuff.Buffer is the most mature implementation in this package, and is battle tested one-many producer / consumer implementation. It operates as an unbounded FIFO queue by default, reaping messages after they are read by all (and at least one) consumer. Custom behavior may be implemented using the bigbuff.Cleaner type. The buffer's behavior may be modified to enforce (soft) bounding of the size of the queue, via the bigbuff.FixedBufferCleaner function. Benchmarks... someday.
  • Any readable channel may be used as a bigbuff.Consumer, using bigbuff.NewChannel
  • bigbuff.Notifier uses reflect.Select to provide synchronous fan-out pub-sub (keyed, sending to any supported channel that is subscribed at time of publish). This implementation is far more compact than bigbuff.Buffer, but at this time it is not actively being used in a production environment. It is also far slower, and suitable for a much smaller number of concurrent consumers (per key / buffer instance). That said it is much easier to use, and solves my original problem case of dynamically wiring up existing message processing that makes heavy use of channels very well. The learning curve for this implementation should be trivial, as it's behavior is identical to direct interaction with multiple channels + context, using select.
  • bigbuff.Exclusive is another battle-tested implementation, providing debouncing of keyed operations, either as part of a background process, or in the foreground, potentially blocking for a return value (which may be shared between multiple, debounced callers)
  • Useful for limiting concurrent executions, bigbuff.Workers is compatible with bigbuff.Exclusive. It's a simple on-demand background worker orchestrator, which deliberately uses a compatible function signature with relevant methods from bigbuff.Exclusive. bigbuff.MinDuration is provided in the same vein.

Caveats

  • All implementations were learning experiences, and in some, particularly older cases, the API has been left unchanged solely for backwards compatibility
  • Unbounded queues tend to require additional management, especially if there is any chance that the consumer side may fall behind the producer side (cascading failures == bad)
  • The use cases of most of these tools are complicated enough to deserve extensive examples and discussion

Documentation

Overview

Package bigbuff implements many useful concurrency primitives and utilities.

Index

Examples

Constants

View Source
const (
	// DefaultCleanerCooldown is how long the cleaner will wait between checks of the Buffer by default.
	DefaultCleanerCooldown = time.Millisecond * 10

	// DefaultChannelPollRate is how frequently each waiting Channel.Get should try to receive from the channel (from
	// the first failure/non-receive).
	DefaultChannelPollRate = time.Millisecond
)

Variables

This section is empty.

Functions

func Call added in v1.15.1

func Call(caller Callable, options ...CallOption) error

Call will call a Callable with any provided options, available as functions provided by this package that are prefixed with "Call", see also MustCall

Example (Rpc)

ExampleCall_rpc provides a (contrived) example of how Call may be used as part of an RPC implementation

var (
	methods = map[string]Callable{
		`add`: NewCallable(func(a, b int) int { return a + b }),
		`sum`: NewCallable(func(values ...int) (r int) {
			for _, v := range values {
				r += v
			}
			return
		}),
		`bounds`: NewCallable(func(values ...int) (min, max int, ok bool) {
			for _, value := range values {
				if !ok {
					min, max, ok = value, value, true
				} else {
					if value < min {
						min = value
					}
					if value > max {
						max = value
					}
				}
			}
			return
		}),
	}
	call = func(name string, args ...interface{}) (results []interface{}) {
		MustCall(
			methods[name],
			CallArgs(args...),
			CallResultsSlice(&results),
		)
		return
	}
	p = func(name string, args ...interface{}) {
		results := func() (v interface{}) {
			defer func() {
				if v == nil {
					v = recover()
				}
			}()
			v = call(name, args...)
			return
		}()
		fmt.Printf("%s %v -> %v\n", name, args, results)
	}
)

fmt.Println(`success:`)
p(`add`, 1, 2)
p(`bounds`, -123884, 4737, 9, 0, -99999992, 4, 6, 8324884383, -3)
p(`bounds`)
p(`bounds`, 2)
p(`sum`, -123884, 4737, 9, 0, -99999992, 4, 6, 8324884383, -3)

fmt.Println(`failure:`)
p(`add`, 1, 2, 3)
p(`add`, 1, 2.0)
p(`bounds`, 2.0)
Output:

success:
add [1 2] -> [3]
bounds [-123884 4737 9 0 -99999992 4 6 8324884383 -3] -> [-99999992 8324884383 true]
bounds [] -> [0 0 false]
bounds [2] -> [2 2 true]
sum [-123884 4737 9 0 -99999992 4 6 8324884383 -3] -> [8224765260]
failure:
add [1 2 3] -> bigbuff.CallArgs args error: invalid length: mandatory=2 variadic=false len=3
add [1 2] -> bigbuff.CallArgs args[1] error: float64 not assignable to int
bounds [2] -> bigbuff.CallArgs args[0] error: float64 not assignable to int

func CombineContext

func CombineContext(ctx context.Context, others ...context.Context) context.Context

CombineContext returns a context based on the ctx (first param), that will cancel when ANY of the other provided context values cancel CAUTION this spawns one or more blocking goroutines, if you call this with contexts that don't cancel in the reasonable lifetime of your application you will have a leak

func DefaultCleaner

func DefaultCleaner(size int, offsets []int) int

DefaultCleaner is the Buffer's default cleaner, if there is at least one "active" consumer it returns the lowest offset, defaulting to 0, effectively removing values from the buffer that all consumers have read, note the return value is limited to >= 0 and <= size, active consumers are defined as those registered with offsets >= 0.

func ExponentialRetry added in v1.6.0

func ExponentialRetry(ctx context.Context, rate time.Duration, value func() (interface{}, error)) func() (interface{}, error)

ExponentialRetry implements a simple exponential back off and retry, via closure wrapper, as described on Wikipedia (https://en.wikipedia.org/wiki/Exponential_backoff) supporting context canceling (while waiting / before starting), configurable base rate / slot time which will default to 300ms if rate is <= 0 (SUBJECT TO CHANGE), and the ability to support fatal errors via use of the FatalError error wrapper function provided by this package. Notes: 1. This function will panic if value is nil, but NOT if ctx is nil (the latter is not recommended but the existing implementations have this behavior already). 2. The exit case triggered via use of the FatalError wrapper will include any accompanying result, as well as the unpacked error value (which will always be non-nil since FatalError will panic otherwise). 3. Before each call to value the context error will be checked, and if non-nil will be propagated as-is with a nil result. 4. This implementation uses the math/rand package.

func FatalError added in v1.6.0

func FatalError(err error) error

FatalError wraps a given error to indicate to functions or methods that receive a closure that they should no longer continue to operate (applies to: ExponentialRetry), note that the error type will be transparently and recursively unpacked, for any return values, from said methods or functions, so DO NOT attempt to chain such calls without explicit handling at the top level for each fatal-able operation NOTE calls to this function with a nil err will trigger a panic

func LinearAttempt added in v1.11.0

func LinearAttempt(ctx context.Context, rate time.Duration, count int) <-chan time.Time

LinearAttempt returns a new channel that will be published, at most, every rate, for a maximum total of count messages, and will be closed after either reaching count or context cancel, whichever comes first. Note that it is buffered and will start with a single value, and behaves identically to time.NewTicker in that it will attempt to keep a constant rate, but compensates for slow consumers, and in that the value received will be the time at which the last attempt was scheduled (the offset from the current time being equivalent to the conflation rate). Either rate or count being <= 0 or ctx being nil will trigger a panic. Note that the initial publish will happen inline, and context errors are guarded, meaning if the context returns an error when first checked then the returned channel will always be closed, with no values sent.

This implementation is designed to be iterated over, by using range, with resource freeing via context cancel. It is very useful when implementing something that will attempt to perform an action at a maximum rate, for a maximum amount of times, e.g. for linear retry logic.

Example (AtMostOneTickAfterCancel)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
i := 0
for range LinearAttempt(contextNeverDone{ctx}, time.Millisecond*50, 10) {
	i++
	fmt.Printf("iteration #%d\n", i)
	if i < 5 {
		continue
	}
	cancel()
	time.Sleep(time.Millisecond * 100)
	fmt.Println(`canceled...`)
}
fmt.Printf("%d iterations\n", i)
Output:

iteration #1
iteration #2
iteration #3
iteration #4
iteration #5
canceled...
5 iterations
Example (Full)
defer func() func() {
	start := runtime.NumGoroutine()
	return func() {
		finish := runtime.NumGoroutine()
		if start < finish {
			panic(fmt.Sprint(`started with`, start, `goroutines but finished with`, finish))
		}
	}
}()()
start := time.Now()
for range LinearAttempt(context.Background(), time.Millisecond*200, 5) {
	fmt.Println(int64((time.Now().Sub(start) + (time.Millisecond * 100)) / (time.Millisecond * 200)))
}
time.Sleep(time.Millisecond * 300)
Output:

0
1
2
3
4
Example (SlowConsumer)
defer func() func() {
	start := runtime.NumGoroutine()
	return func() {
		finish := runtime.NumGoroutine()
		if start < finish {
			panic(fmt.Sprint(`started with`, start, `goroutines but finished with`, finish))
		}
	}
}()()
start := time.Now()
r, c := func() (func(), func()) {
	c := LinearAttempt(context.Background(), time.Millisecond*200, 7)
	return func() {
			ts := <-c
			fmt.Println(int64((time.Now().Sub(start)+(time.Millisecond*100))/(time.Millisecond*200)), int64((ts.Sub(start)+(time.Millisecond*100))/(time.Millisecond*200)))
		}, func() {
			r := reflect.ValueOf(c)
			if v, ok := r.TryRecv(); ok || v.Type() != reflect.TypeOf(time.Time{}) {
				panic(c)
			}
		}
}()
defer c()
defer time.Sleep(time.Millisecond * 300)
time.Sleep(time.Millisecond * 325)
r()
r()
r()
r()
time.Sleep(time.Millisecond * 525)
r()
r()
r()
Output:

2 0
2 2
3 3
4 4
7 5
7 7
8 8

func MinDuration added in v1.10.0

func MinDuration(d time.Duration, fn func() (interface{}, error)) func() (interface{}, error)

MinDuration is a simple wrapper for the function signature used by Exclusive and Workers which adds a sleep for any remainder of a given duration, is intended to make it trivial to build a debounced rate limited partitioned worker implementation. NOTE a panic will occur if the duration d is not greater than 0, or if the function fn is nil.

func MustCall added in v1.15.1

func MustCall(caller Callable, options ...CallOption)

MustCall is equivalent to Call but will panic on error

func Range

func Range(ctx context.Context, consumer Consumer, fn func(index int, value interface{}) bool) (err error)

Range iterates over the consumer, encapsulating automatic commits and rollbacks, including rollbacks caused by panics, note that the index will be the index in THIS range, starting at 0, and incrementing by one with each call to fn. NOTE: the ctx value will be passed into the consumer.Get as-is.

func WaitCond

func WaitCond(ctx context.Context, cond *sync.Cond, fn func() bool) error

WaitCond performs a conditional wait against a *sync.Cond, waiting until fn returns true, with a inbuilt escape hatch for context cancel. Note that the relevant locker must be locked before this is called. It should also be noted that cond.L.Lock will before a context triggered broadcast, in order to avoid a race condition (i.e. if context is cancelled while fn is being evaluated).

Types

type Buffer

type Buffer struct {
	// contains filtered or unexported fields
}

Buffer is the core implementation, implementing the Producer interface, and providing auxiliary methods for configuration, as well as `NewConsumer`, to instance a new consumer, note that though it is safe to instance via new(bigbuff.Buffer), it must not be copied after first use. It's behavior regarding message retention may be configured via SetCleanerConfig, by default it will un-buffer only messages that have been read by all "active and valid" consumers, given at least one exists, otherwise it will retain messages indefinitely. NOTE: the buffer itself will not be cleared even after close, so the data can still be accessed.

func (*Buffer) CleanerConfig

func (b *Buffer) CleanerConfig() CleanerConfig

CleanerConfig returns the current cleaner config (which has defaults)

func (*Buffer) Close

func (b *Buffer) Close() (err error)

func (*Buffer) Diff added in v1.1.0

func (b *Buffer) Diff(c Consumer) (int, bool)

Diff is provided to facilitate ranging over a buffer via a consumer, and returns the items remaining in the buffer (includes uncommitted), be aware that the value CAN be negative, in the event the consumer fell behind (the cleaner cleared item(s) from the buffer that the consumer hadn't read yet, which by default will never happen, as the default mode is a unbounded buffer). Note it will return (0, false) for any invalid consumers or any not registered on the receiver buffer.

func (*Buffer) Done

func (b *Buffer) Done() <-chan struct{}

func (*Buffer) NewConsumer

func (b *Buffer) NewConsumer() (Consumer, error)

NewConsumer constructs a new consumer instance.

func (*Buffer) Put

func (b *Buffer) Put(ctx context.Context, values ...interface{}) error

func (*Buffer) Range added in v1.1.0

func (b *Buffer) Range(ctx context.Context, c Consumer, fn func(index int, value interface{}) bool) error

Range provides a way to iterate from the start to the end of the buffer, note that it will exit as soon as it reaches the end of the buffer (unlike ranging on a channel), it simply utilizes the package Range + Buffer.Diff.

func (*Buffer) SetCleanerConfig

func (b *Buffer) SetCleanerConfig(config CleanerConfig) error

SetCleanerConfig updates the cleaner config, returning an error if the config was invalid.

func (*Buffer) Size

func (b *Buffer) Size() int

Size returns the length of the buffer

func (*Buffer) Slice

func (b *Buffer) Slice() []interface{}

Slice returns a copy of the internal message buffer (all currently stored in memory). NOTE: this will read-lock the buffer itself, and is accessible even after the buffer is closed.

type CallOption added in v1.15.1

type CallOption func(config *callConfig) error

CallOption models a configuration option for the Call function provided by this package, see also the Call-prefixed functions provided by this package

func CallArgs added in v1.15.1

func CallArgs(args ...interface{}) CallOption

CallArgs returns a CallOption that will pass the provided args to a Callable that is called via the Call function

Example (FuncResults)
MustCall(
	NewCallable(fmt.Println),
	CallArgs(func() (int, string, bool) { return 3, `multiple return values -> varargs`, true }()),
)
Output:

3 multiple return values -> varargs true

func CallArgsRaw added in v1.15.1

func CallArgsRaw(args interface{}) CallOption

CallArgsRaw returns a CallOption that will pass args to Callable.Call without modification or validation

func CallResults added in v1.15.1

func CallResults(results ...interface{}) CallOption

CallResults returns a CallOption that will assign the call's results to the values pointed at by results

func CallResultsRaw added in v1.15.1

func CallResultsRaw(results interface{}) CallOption

CallResultsRaw returns a CallOption that will pass results to Callable.Call without modification or validation

func CallResultsSlice added in v1.15.1

func CallResultsSlice(target interface{}) CallOption

CallResultsSlice returns a CallOption that will append the call's results to a slice pointed at by target

type Callable added in v1.15.1

type Callable interface {
	// Type must return a valid type of kind func, corresponding to the type of the Callable
	Type() reflect.Type
	// Call accepts an args function and passes results into a results function, to be utilised in a manner
	// generally equivalent to `results(callable(args()))`, treating nil (interface values) as omitting any
	// args, or not handling any return value. Note that Call will return an error if args or results are not
	// compatible with the underlying types, indicated by Callable.Type. See also bigbuff.Call and CallOption.
	Call(args, results interface{}) error
}

Callable models a function, and is used by this package to provide a higher-level mechanism (than the reflect package) for calling arbitrary functions, in a generic way, see also the NewCallable factory function

func NewCallable added in v1.15.1

func NewCallable(fn interface{}) Callable

NewCallable initialises a new Callable from fn, which must be a non-nil function, but is otherwise unconstrained, note a panic will occur if fn is not a function, or is a function but isn't non-nil

type Channel

type Channel struct {
	// contains filtered or unexported fields
}

Channel implements Consumer based on data from a channel, note that because it uses reflection and polling internally, it is actually safe to close the input channel without invalid zero value reads (though the Channel itself still needs to be closed, and any Get calls will still be blocked until then).

func NewChannel

func NewChannel(ctx context.Context, pollRate time.Duration, source interface{}) (*Channel, error)

NewChannel constructs a new consumer that implements Consumer, but receives it's data from a channel, which uses reflection to support any readable channel, note that a poll date of zero will use the default, and < 0 is an error.

func (*Channel) Buffer

func (c *Channel) Buffer() []interface{}

Buffer returns any values that were drained from the source but not committed yet, in a new copy of the internal buffer, note that if you are trying to ensure no messages get lost in the void, block until Channel.Done before calling this.

func (*Channel) Close

func (c *Channel) Close() error

Close closes the consumer NOTE that it doesn't close the source channel.

func (*Channel) Commit

func (c *Channel) Commit() error

Commit resets read parts of the buffer, or returns an error, note it will always error after context cancel.

func (*Channel) Done

func (c *Channel) Done() <-chan struct{}

func (*Channel) Get

func (c *Channel) Get(ctx context.Context) (value interface{}, err error)

func (*Channel) Rollback

func (c *Channel) Rollback() error

Rollback will cause following Get calls to read from the start of the buffer, or it will return an error if there is nothing to rollback (there is nothing pending).

type Cleaner

type Cleaner func(size int, offsets []int) int

Cleaner is a callback used to manage the size of a bigbuff.Buffer instance, it will be called when relevant to do so, with the size of the buffer, and the consumer offsets (relative to the buffer), and should return the number of elements from the buffer that should be attempted to be shifted from the buffer.

func FixedBufferCleaner

func FixedBufferCleaner(
	max int,
	target int,
	callback func(notification FixedBufferCleanerNotification),
) Cleaner

FixedBufferCleaner builds a cleaner that will give a buffer a fixed threshold size, which will trigger forced reduction back to a fixed target size, note that if callback is supplied it will be called with the details of the cleanup, in the event that it forces cleanup past the default. This has the effect of causing any consumers that were running behind the target size (in terms of their read position in the buffer) to fail on any further Get calls.

type CleanerConfig

type CleanerConfig struct {
	// Cleaner is used to determine if items are removed from the buffer
	Cleaner Cleaner

	// Cooldown is the minimum time between cleanup cycles
	Cooldown time.Duration
}

CleanerConfig is a configuration for a bigbuff.Buffer, that defines how the size is managed

type Consumer

type Consumer interface {
	io.Closer

	// Done should return a channel that will be closed after internal resources have been freed, after a `Close`
	// call, which may not be explicit. This *may* mean that it blocks on any pending changes, and it *may* also
	// be possible that the consumer will be closed due to external reasons, e.g. connection closing.
	Done() <-chan struct{}

	// Get will get a message from the message buffer, at the current offset, blocking if none are available, or
	// an error if it fails.
	Get(ctx context.Context) (interface{}, error)

	// Commit will save any offset changes, and will return an error if it fails, or if the offset saved is the
	// latest.
	Commit() error

	// Rollback will undo any offset changes, and will return an error if it fails, or if the offset saved is the
	// latest.
	Rollback() error
}

Consumer models a consumer in a producer-consumer pattern, where the resource will be closed at most once.

type Exclusive added in v1.2.1

type Exclusive struct {
	// contains filtered or unexported fields
}

Exclusive provides synchronous de-bouncing of operations that may also return a result or error, with consistent or controlled input via provided closures also supported, and the use of any comparable keys to match on, it provides a guarantee that the actual call that returns a given value will be started AFTER the Call method, so keep that in mind when implementing something using it. You may also use the CallAfter method to delay execution after initialising the key, e.g. to allow the first of many costly operations on a given key a grace period to be grouped with the remaining keys.

func (*Exclusive) Call added in v1.2.1

func (e *Exclusive) Call(key interface{}, value func() (interface{}, error)) (interface{}, error)

Call uses a given key to ensure that the operation that the value callback represents will not be performed concurrently, and in the event that one or more operations are attempted while a given operation is still being performed, these operations will be grouped such that they are debounced to a single call, sharing the output.

Note that this method will panic if the receiver is nil, or the value is nil, but a nil key is allowed.

func (*Exclusive) CallAfter added in v1.2.1

func (e *Exclusive) CallAfter(key interface{}, value func() (interface{}, error), wait time.Duration) (interface{}, error)

CallAfter performs exactly the same operation as the Exclusive.Call method, but with an added wait to allow operations sent through in close succession to be grouped together, note that if wait is <= 0 it will be ignored.

func (*Exclusive) CallAfterAsync added in v1.3.0

func (e *Exclusive) CallAfterAsync(key interface{}, value func() (interface{}, error), wait time.Duration) <-chan *ExclusiveOutcome

CallAfterAsync behaves exactly the same as CallAfter but guarantees order (the value func) for synchronous calls.

Note that the return value will always be closed after being sent the result, and will therefore any additional reads will always receive nil.

func (*Exclusive) CallAsync added in v1.3.0

func (e *Exclusive) CallAsync(key interface{}, value func() (interface{}, error)) <-chan *ExclusiveOutcome

CallAsync behaves exactly the same as Call but guarantees order (the value func) for synchronous calls.

Note that the return value will always be closed after being sent the result, and will therefore any additional reads will always receive nil.

func (*Exclusive) CallWithOptions added in v1.15.1

func (e *Exclusive) CallWithOptions(options ...ExclusiveOption) <-chan *ExclusiveOutcome

CallWithOptions consolidates the various different ways to use Exclusive into a single method, to improve maintainability w/o breaking API compatibility. Other methods such as Call and Start will continue to be supported.

func (*Exclusive) Start added in v1.9.0

func (e *Exclusive) Start(key interface{}, value func() (interface{}, error))

Start is synonymous with a CallAsync that avoids spawning a unnecessary goroutines to wait for results

func (*Exclusive) StartAfter added in v1.9.0

func (e *Exclusive) StartAfter(key interface{}, value func() (interface{}, error), wait time.Duration)

StartAfter is synonymous with a CallAfterAsync that avoids spawning a unnecessary goroutines to wait for results

type ExclusiveOption added in v1.15.1

type ExclusiveOption func(c *exclusiveConfig)

ExclusiveOption passes configuration into Exclusive.CallWithOptions, see also package functions prefixed with Exclusive, such as ExclusiveKey and ExclusiveWork.

func ExclusiveKey added in v1.15.1

func ExclusiveKey(value interface{}) ExclusiveOption

ExclusiveKey configures a comparable value for grouping calls, for debouncing, limiting, etc.

func ExclusiveRateLimit added in v1.15.1

func ExclusiveRateLimit(ctx context.Context, minDuration time.Duration) ExclusiveOption

ExclusiveRateLimit is typically a drop-in replacement for MinDuration that works properly with non-start calls, and returns a ExclusiveWrapper option. Note that the context is for cleaning up the resources required to apply the rate limit (the rate limit itself), and will also be used to guard the actual work, for safety reasons.

func ExclusiveStart added in v1.15.1

func ExclusiveStart(value bool) ExclusiveOption

ExclusiveStart configures "start" behavior for the call, which, if true, will avoid the overhead required to propagate results, which will also cause the return value (outcome channel) to be nil. See also Exclusive.Start.

func ExclusiveValue added in v1.15.1

func ExclusiveValue(value func() (interface{}, error)) ExclusiveOption

ExclusiveValue implements a simpler style of ExclusiveWork (that was originally the only supported behavior).

func ExclusiveWait added in v1.15.1

func ExclusiveWait(value time.Duration) ExclusiveOption

ExclusiveWait configures the duration (since the start of the call) that should be waited, before actually calling the work function, see also Exclusive.CallAfter.

func ExclusiveWork added in v1.15.1

func ExclusiveWork(value WorkFunc) ExclusiveOption

ExclusiveWork configures the work function (what will actually get called).

func ExclusiveWrapper added in v1.15.1

func ExclusiveWrapper(value func(value WorkFunc) WorkFunc) ExclusiveOption

ExclusiveWrapper facilitates programmatic building of work, note that the ExclusiveWork or ExclusiveValue option must still be provided, but may be provided in any order (after or before this option). If there are multiple wrappers, they will be applied sequentially (left -> right is inner -> outer).

type ExclusiveOutcome added in v1.3.0

type ExclusiveOutcome struct {
	Result interface{}
	Error  error
}

ExclusiveOutcome is the return value from an async bigbuff.Exclusive call

type FixedBufferCleanerNotification

type FixedBufferCleanerNotification struct {
	Max     int   // Max size before forced cleanup is triggered.
	Target  int   // Target size when force cleanup is triggered.
	Size    int   // Size when cleanup was triggered.
	Offsets []int // Offsets when cleanup was triggered.
	Trim    int   // Trim number returned.
}

FixedBufferCleanerNotification is the context provided to the optional callback provided to the FixedBufferCleaner function.

type Notifier added in v1.7.0

type Notifier struct {
	// contains filtered or unexported fields
}

Notifier is a tool which may be used to facilitate event handling using a fan out pattern, modeling a pattern that is better described as publish-subscribe rather than produce-consume, and tries to be semantically equivalent to implementations using channels guarded via context cancels using select statements.

It sits between the Exclusive and Buffer implementations in terms of behavior, yet it's use case is still distinct. Where Buffer shines when providing multiplexing or fanning out of serializable streams of messages, and Exclusive is explicitly designed to be attached to existing expensive tasks which need to occur as a result of multiple triggers, Notifier targets reactive behavior based on asynchronous operations. It provides basic event handling without any buffering or queuing between the producer and subscriber present in the layer before the actual target channels.

Note that it uses reflect internally, to avoid clients needing to rely on generic interface values.

Example (ContextCancelSubscribe)
var (
	nf          Notifier
	k           = 0
	c           = make(chan string)
	d           = make(chan struct{})
	ctx, cancel = context.WithCancel(context.Background())
)
defer cancel()

nf.SubscribeContext(ctx, k, c)

fmt.Println(`starting blocking publish then waiting a bit...`)
go func() {
	defer close(d)
	fmt.Println(`publish start`)
	nf.Publish(k, `one`)
	fmt.Println(`publish finish`)
}()
time.Sleep(time.Millisecond * 100)

fmt.Println(`canceling context then blocking for publish exit...`)
cancel()
<-d

fmt.Println(`closing publish channel...`)
close(c)
time.Sleep(time.Millisecond * 50)

fmt.Println(`success!`)
Output:

starting blocking publish then waiting a bit...
publish start
canceling context then blocking for publish exit...
publish finish
closing publish channel...
success!
Example (PubSubKeys)
var (
	k1 = `some-key`
	k2 = 100
	c1 = make(chan string)
	c2 = make(chan string)
	c3 = make(chan string)
	wg sync.WaitGroup
	nf Notifier
)
wg.Add(3)
go func() {
	defer wg.Done()
	for v := range c1 {
		fmt.Println("c1 recv:", v)
	}
}()
go func() {
	defer wg.Done()
	for v := range c2 {
		fmt.Println("c2 recv:", v)
	}
}()
go func() {
	defer wg.Done()
	for v := range c3 {
		time.Sleep(time.Millisecond * 100)
		fmt.Println("c3 recv:", v)
	}
}()
nf.Subscribe(k1, c1)
nf.Subscribe(k2, c2)
nf.Subscribe(k1, c3)
nf.Subscribe(k2, c3)

nf.Publish(k1, `one`)
time.Sleep(time.Millisecond * 200)
nf.Publish(k2, `two`)

close(c1)
close(c2)
close(c3)
wg.Wait()
Output:

c1 recv: one
c3 recv: one
c2 recv: two
c3 recv: two

func (*Notifier) Publish added in v1.7.0

func (n *Notifier) Publish(key interface{}, value interface{})

Publish is equivalent of PublishContext(nil, key, value)

func (*Notifier) PublishContext added in v1.7.0

func (n *Notifier) PublishContext(ctx context.Context, key interface{}, value interface{})

PublishContext will send value to the targets of all active subscribers for a given key for which value is assignable, blocking until ctx is canceled (if non-nil), or each relevant subscriber is either sent value or cancels it's context

func (*Notifier) Subscribe added in v1.7.0

func (n *Notifier) Subscribe(key interface{}, target interface{})

Subscribe is equivalent of SubscribeContext(nil, key, target)

func (*Notifier) SubscribeCancel added in v1.8.0

func (n *Notifier) SubscribeCancel(ctx context.Context, key interface{}, target interface{}) context.CancelFunc

SubscribeCancel wraps SubscribeContext and Unsubscribe as well as the initialisation of a sub context, for defer statements using the result as a one-liner, and is the most fool-proof way to implement a subscriber, at the cost of less direct management of resources (including some which are potentially unnecessary, as it uses a sub-context and the returned cancel obeys the contract of context.CancelFunc and does not perform Unsubscribe inline)

Example
defer func() func() {
	startGoroutines := runtime.NumGoroutine()
	return func() {
		time.Sleep(time.Millisecond * 200)
		endGoroutines := runtime.NumGoroutine()
		if endGoroutines <= startGoroutines {
			fmt.Println(`our resources were freed`)
		}
	}
}()()
var (
	nf          Notifier
	ping        = make(chan float64)
	pong        = make(chan float64)
	ctx, cancel = context.WithCancel(context.Background())
)
defer cancel()
defer nf.SubscribeCancel(nil, `ping`, ping)()
defer nf.SubscribeCancel(nil, `pong`, pong)()
go func() {
	// worker which will receive all values and respond with that value x2
	// but first... sleep, to demonstrate it's not racey
	time.Sleep(time.Millisecond * 100)
	for {
		select {
		case <-ctx.Done():
			fmt.Println(`worker exiting`)
			return
		case value := <-ping:
			nf.PublishContext(ctx, `pong`, value*2)
		}
	}
}()
fmt.Println(`PING 5 x 2 = ...`)
nf.PublishContext(ctx, `ping`, 5.0)
fmt.Println(`PONG`, <-pong)
fmt.Println(`PING -23 x 2 = ...`)
nf.PublishContext(ctx, `ping`, -23.0)
fmt.Println(`PONG`, <-pong)
Output:

PING 5 x 2 = ...
PONG 10
PING -23 x 2 = ...
PONG -46
worker exiting
our resources were freed

func (*Notifier) SubscribeContext added in v1.7.0

func (n *Notifier) SubscribeContext(ctx context.Context, key interface{}, target interface{})

SubscribeContext registers a given target channel as a subscriber for a given key, which will block any attempts to publish to the key unless it is received from appropriately, or until context cancel (if a non-nil context was provided), be sure to unsubscribe exactly once to free references to ctx and target. A panic will occur if target is not a channel to which the notifier can send, or if there already exists a subscription for the given key and target combination. The key may be any comparable value.

func (*Notifier) Unsubscribe added in v1.7.0

func (n *Notifier) Unsubscribe(key interface{}, target interface{})

Unsubscribe deregisters a given key and target from the notifier, an action that may be performed exactly once after each subscription (for the combination of key and target), preventing further messages from being published to the target, and allowing freeing of associated resources WARNING subscribe context should always be canceled before calling this, or it may deadlock (especially under load)

type Producer

type Producer interface {
	io.Closer

	// Done should return a channel that will be closed after internal resources have been freed, after a `Close`
	// call, which may not be explicit.  This *may* mean that it blocks on any pending changes, and it *may* also
	// be possible that the consumer will be closed due to external reasons, e.g. connection closing.
	Done() <-chan struct{}

	// Put will send the provided values in-order to the message buffer, or return an error.
	// It MUST NOT block in such a way that it will be possible to cause a deadlock locally.
	Put(ctx context.Context, values ...interface{}) error
}

Producer models a producer in a producer-consumer pattern, where the resource will be closed at most once.

type WorkFunc added in v1.15.1

type WorkFunc func(resolve func(result interface{}, err error))

WorkFunc is a work function, as used by Exclusive.

type Worker added in v1.15.1

type Worker struct {
	// contains filtered or unexported fields
}

Worker implements a background worker pattern, providing synchronisation around running at most a single worker, for n number of callers (of it's Do method).

Note that although the sync.WaitGroup is used internally, sync.Once is still preferable for simpler cases.

func (*Worker) Do added in v1.15.1

func (x *Worker) Do(fn func(stop <-chan struct{})) (done func())

Do will call fn in a new goroutine, if the receiver is not already running, and will always return a done func which must be called, to indicate when the worker is no longer in use. Once no callers are using a worker, that worker will be stopped. Stopping involves closing the stop channel, causing further calls to Do to block, until the worker finishes. A panic will occur if either the receiver or fn are nil.

type Workers added in v1.4.0

type Workers struct {
	// contains filtered or unexported fields
}

Workers represents a dynamically resizable pool of workers, for when you want to have up to x number of operations happening at any given point in time, it can work directly with the Exclusive implementation.

func (*Workers) Call added in v1.4.0

func (w *Workers) Call(count int, value func() (interface{}, error)) (interface{}, error)

Call will call value synchronously, with up to count concurrency (with other concurrent calls), note that it will panic if the receiver is nil, the count is <= 0, or the value is nil.

func (*Workers) Count added in v1.5.0

func (w *Workers) Count() int

Count will return the number of workers currently running

func (*Workers) Wait added in v1.5.0

func (w *Workers) Wait()

Wait will unblock when all workers are complete

func (*Workers) Wrap added in v1.4.0

func (w *Workers) Wrap(count int, value func() (interface{}, error)) func() (interface{}, error)

Wrap encapsulates the provided value as a worker call, note that it will panic if the receiver is nil, the count is <= 0, or the value is nil.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL