Documentation
¶
Overview ¶
Package batcher provides a generic request-coalescing framework.
It collects incoming requests, groups them by a caller-defined key, waits for a configurable idle/max timeout, then dispatches each group to a caller-defined executor. Callers block on a per-request response channel until the batch fires.
Type parameters:
- RequestPayload: the original request payload type (e.g., an AKS machine body for creation)
- ResponsePayload: the response type returned to each request (e.g., a poller for async operations, or struct{} if unused)
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Batch ¶
type Batch[RequestPayload, ResponsePayload any] struct { ID string Key string Requests []*BatchedRequest[RequestPayload, ResponsePayload] }
Batch is a group of requests with the same key.
type BatchedRequest ¶
type BatchedRequest[RequestPayload, ResponsePayload any] struct { Key string // Grouping key, set by the batcher after calling DetermineBatchKey ResponseChan chan *Response[ResponsePayload] // Caller waits on this channel for the response after batch execution Payload RequestPayload // The original request payload (e.g., an AKS machine body for creation) }
BatchedRequest is a single request (w/ payload) being batched with others.
type Batcher ¶
type Batcher[RequestPayload, ResponsePayload any] struct { // contains filtered or unexported fields }
Batcher collects requests, groups them by key, and dispatches batches after a configurable idle/max timeout window.
func New ¶
func New[RequestPayload, ResponsePayload any]( ctx context.Context, determineBatchKeyFunc DetermineBatchKey[RequestPayload], executeBatchFunc ExecuteBatch[RequestPayload, ResponsePayload], opts Options, ) *Batcher[RequestPayload, ResponsePayload]
New creates a Batcher with configured behavior. Call Start() to begin processing loop.
func (*Batcher[RequestPayload, ResponsePayload]) Enqueue ¶
func (b *Batcher[RequestPayload, ResponsePayload]) Enqueue(payload RequestPayload) (chan *Response[ResponsePayload], error)
Enqueue adds a request to the appropriate batch and returns a response channel. The caller should select on the channel and ctx.Done().
type DetermineBatchKey ¶
DetermineBatchKey computes a grouping key from a payload that will be batched from. Payloads with the same key land in the same batch. The caller module must provide this.
type ExecuteBatch ¶
type ExecuteBatch[RequestPayload, ResponsePayload any] func(ctx context.Context, batch *Batch[RequestPayload, ResponsePayload])
ExecuteBatch is called when a batch fires by the batcher. It receives the batch and must send a response to every request's ResponseChan. The caller module must provide this.
type Options ¶
type Options struct {
// IdleTimeout is how long to wait with no new requests before firing the batch.
IdleTimeout time.Duration
// MaxTimeout is the maximum time a batch can remain open regardless of activity.
MaxTimeout time.Duration
// MaxBatchSize causes a batch to fire immediately once it reaches this many requests.
MaxBatchSize int
}
Options configures the batching behavior.
Small timeouts = lower latency, more API calls. Large timeouts = better batching, higher latency.
type Response ¶
type Response[ResponsePayload any] struct { Payload ResponsePayload // The response payload to send back to the caller (e.g., a poller for async operations and API errors) Err error // Operational error (e.g., batch execution failure) to send back to the caller }
Response is used by ExecuteBatch to send a response to each original request.