longpoll

package module
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 19, 2024 License: MIT Imports: 3 Imported by: 0

README

go-longpoll

Package longpoll supports batching e.g. receiving as many values as possible from a channel.

See the docs for example usage.

See also github.com/joeycumines/go-microbatch, for a higher-level implementation, with built-in concurrency control, and support for batched request/response patterns.

Documentation

Overview

Package longpoll supports batching e.g. receiving as many values as possible from a channel.

See also github.com/joeycumines/go-microbatch, for a higher-level implementation, with built-in concurrency control, and support for batched request/response patterns.

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func Channel

func Channel[T any](ctx context.Context, cfg *ChannelConfig, ch <-chan T, handler func(value T) error) error

Channel performs a blocking receive on the channel, returning as many values as possible, given the constraints. If ctx cancels, the error will be returned. The cfg parameter is optional, and may be nil, in which case the documented defaults will be used. Values will be received from ch, and passed to handler. Errors from handler will be returned, and cause the call to Channel to return.

If the channel is closed, and all buffered values are received, Channel will return io.EOF. In this scenario, the minimum size may not be reached.

Providing a nil ctx, ch, or handler will cause a panic.

Example
// we will receive incoming requests on this channel
ch := make(chan string, 32)

// in this scenario, we're performing batching of requests, by long-polling from ch
batch := func() {
	// no mutex is required to write to this - handler blocks longpoll.Channel
	var buffer []string

	// the behavior is configurable, but we'll use the defaults here (see the docs for longpoll.ChannelConfig)
	if err := longpoll.Channel(context.Background(), nil, ch, func(value string) error {
		buffer = append(buffer, value)
		return nil
	}); err != nil {
		panic(err)
	}

	// similarly, we can just read the result of the batch from buffer
	fmt.Printf("Hello to our new friends:\n%s\n", strings.Join(buffer, "\n"))
}

// test data
names := []string{
	"Olivia",
	"Liam",
	"Emma",
	"Noah",
	"Ava",
	"Oliver",
	"Sophia",
	"Elijah",
	"Isabella",
	"William",
	"Mia",
	"James",
	"Charlotte",
	"Benjamin",
	"Amelia",
	"Lucas",
	"Harper",
	"Henry",
	"Evelyn",
	"Alexander",
	"Grace",
	"Jack",
}

for i := 0; i < 18; i++ {
	ch <- names[0]
	names = names[1:]
}

fmt.Printf("Buffered %d names prior to batch (%d more than the default max size)\n", len(ch), len(ch)-16)

// the first batch will be immediately filled, up to the max size, which is less than the number of buffered names
batch()

// our second batch will immediately fulfil after it reaches the minimum size
// (waiting until it does, or PartialInterval is reached, which defaults to 50ms, and starts from the first receive)
const defaultMinSize = 4
numUntilMin := defaultMinSize - len(ch)
fmt.Printf("Buffering %d more names to reach the minimum size, while running the batch...\n", numUntilMin)
done := make(chan struct{})
go func() {
	defer close(done)
	batch()
}()
time.Sleep(time.Millisecond * 5)
for i := 0; i < numUntilMin; i++ {
	ch <- names[0]
	names = names[1:]
}
<-done

fmt.Printf("We have %d names buffered, and have a min size (%d, the default) - Channel won't start PartialInterval until the first receive\n", len(ch), defaultMinSize)

done = make(chan struct{})
go func() {
	defer close(done)
	batch()
}()

const defaultPartialInterval = 50 * time.Millisecond
time.Sleep(defaultPartialInterval * 2)
select {
case <-done:
	panic(`expected not done`)
default:
}
fmt.Printf("Slept for double the PartialInterval (%s is the default), and the batch is still blocking, as expected\n", defaultPartialInterval)

fmt.Println(`Sending two names, 10ms apart, which will cause the batch to complete, after PartialInterval, starting from the first send...`)
ch <- names[0]
names = names[1:]
time.Sleep(time.Millisecond * 10)
ch <- names[0]
names = names[1:]

<-done
Output:

Buffered 18 names prior to batch (2 more than the default max size)
Hello to our new friends:
Olivia
Liam
Emma
Noah
Ava
Oliver
Sophia
Elijah
Isabella
William
Mia
James
Charlotte
Benjamin
Amelia
Lucas
Buffering 2 more names to reach the minimum size, while running the batch...
Hello to our new friends:
Harper
Henry
Evelyn
Alexander
We have 0 names buffered, and have a min size (4, the default) - Channel won't start PartialInterval until the first receive
Slept for double the PartialInterval (50ms is the default), and the batch is still blocking, as expected
Sending two names, 10ms apart, which will cause the batch to complete, after PartialInterval, starting from the first send...
Hello to our new friends:
Grace
Jack

Types

type ChannelConfig

type ChannelConfig struct {
	// MaxSize is the absolute maximum number of values to receive. Setting
	// this to a value < 0 will disable the maximum size constraint.
	//
	// Defaults to 16, if 0.
	MaxSize int

	// MinSize is the (target) minimum number of values to receive. If
	// PartialTimeout is configured, the effective minimum size will be 1, if
	// the PartialTimeout is reached.
	//
	// Setting this to a value < 0 will cause the PartialTimeout to start from
	// the call to Channel, and will allow returning without receiving any
	// values. In this scenario, PartialTimeout will apply to the first value.
	//
	// Defaults to 4, if 0.
	MinSize int

	// PartialTimeout is the maximum time to wait for a partial response,
	// defined as a number of received values less than the MinSize. After/if
	// this timeout is reached, the effective minimum size will be reduced, see
	// MinSize for details.
	//
	// Defaults to 50ms, if 0.
	PartialTimeout time.Duration
}

ChannelConfig models optional configuration for the Channel function.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL