Documentation
¶
Index ¶
Constants ¶
const ErrChanBatchSize chanError = "Batch size for thp.Chan can't be lower than 1"
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Chan ¶
type Chan[T any] struct { // contains filtered or unexported fields }
Chan represents a concurrent channel with batching capability. It allows efficient batched communication between producers and consumers, reducing the overhead of individual item transfers.
The channel operates in a concurrent manner, but each producer and consumer should be exclusively used by a single goroutine to ensure thread safety, so create separate Producer[T any] or Consumer[T any] for every goroutine that sends or receives messages. The producer is responsible for adding items to the channel's buffer and flushing them when the batch size is reached. The consumer retrieves items from the channel's buffer and processes them sequentially.
The channel's batch size determines the number of items accumulated in the buffer before a flush operation is triggered. Adjusting the batch size can impact the trade-off between throughput and latency. Smaller batch sizes result in more frequent flushes and lower latency, while larger batch sizes increase throughput at the cost of higher latency. You can also manually trigger flushes.
Context cancellation is supported via separate methods, allowing graceful termination of producers and consumers.
The channel internally manages a sync.Pool to reuse batch buffers and avoid unnecessary allocations. This optimization improves performance by reducing memory allocations during batch creation and disposal.
func NewChan ¶
NewChan creates a new concurrent channel. batchSize specifies the number of elements to batch together before sending them. It returns a pointer to Chan[T] and a cleanup function to close the channel.
func (*Chan[T]) Close ¶
func (ch *Chan[T]) Close()
Close closes the concurrent channel. Close panics on attempted close of already close Chan.
func (*Chan[T]) Consumer ¶
Consumer creates a consumer for the concurrent channel with the given context. The consumer is responsible for retrieving items from the channel's buffer and processing them sequentially.
Note: This method should be called by the same goroutine that will use the consumer.
Example usage:
consumer := channel.Consumer() for { item, ok := consumer.Poll() if !ok { break } // Process the item }
Returns:
- consumer: The created consumer instance.
func (*Chan[T]) Producer ¶
Producer creates a producer for the concurrent channel. The producer is responsible for adding items to the channel's buffer and flushing them when the batch size is reached.
Note: flush method should be called by the same goroutine that will use the producer.
Example usage:
producer, flush := channel.Producer() defer flush() // Ensure sending items through the channel producer.Put(item1) producer.Put(item2)
Methods with provided `ctx` allows for graceful termination of the producer. If the context is canceled, the producer stops accepting new items, any remaining items stay in the buffer. WARNING: do not use returned flush method if you want context aware operations, use FlushCtx instead.
Example of ctx aware operations usage:
producer, _ := channel.Producer() defer producer.FlushCtx(ctx) // Ensure sending items through the channel err := producer.PutCtx(ctx, item) if err != nil { return err }
Returns:
- producer: The created producer instance.
- flush: A function to send any remaining items.
type Consumer ¶
type Consumer[T any] struct { // contains filtered or unexported fields }
Consumer represents a consumer for the concurrent channel. It retrieves items from the channel's buffer and processes them sequentially.
The consumer operates in a concurrent manner, but it should be exclusively used by a single goroutine to ensure thread safety. Create separate Producer instance for every goroutine that sends messages.
The consumer retrieves items by calling the Poll method, which returns the next item from the buffer. If the buffer is empty, the consumer will prefetch the next batch of items from the internal channel to ensure a continuous supply.
The consumer supports both blocking and non-blocking operations.
Context cancellation is supported via separate method, allowing graceful termination of the consumer. When the consumer's context is canceled, it stops fetching new batches from the internal channel and signals the end of consumption.
func (*Consumer[T]) NonBlockingPoll ¶
NonBlockingPoll retrieves the next item from the consumer's buffer in a non-blocking manner. It returns the item, a boolean indicating whether the retrieval was successful, and a boolean indicating whether the internal channel is still open for further consumption.
Note: This method is intended to be used exclusively by a goroutine that owns this Consumer.
func (*Consumer[T]) Poll ¶
Poll retrieves the next item from the consumer's buffer. It returns the item and true if successful, or a zero value and false if there are no more items. Note: This method is intended to be used exclusively by a goroutine that owns this Consumer.
func (*Consumer[T]) PollCtx ¶ added in v0.2.0
PollCtx retrieves the next item from the consumer's buffer. It returns the item and true if successful, or a (zero value, false, nil) if there are no more items or a (zero value, false, error) if context is canceled. Note: This method is intended to be used exclusively by a goroutine that owns this Consumer.
type Counter ¶ added in v0.3.0
type Counter struct {
// contains filtered or unexported fields
}
Counter is a concurrent counter implementation with striping, designed to enhance performance in write-heavy and contended workloads. It reduces contention by distributing the workload across multiple internal counters. Compared to the atomic.Int64 type, this counter may use more memory and have a slower Load operation. However, its Add operations scales better under high load and contention. To balance scalability and memory overhead, you can adjust the level of striping by using the NewCounterWithWideness function and specifying your desired wideness.
NOTE: zero value of Counter is NOT valid, please create new counters using methods provided below.
func NewCounter ¶ added in v0.3.0
func NewCounter() *Counter
NewCounter create new instance of Counter optimised for maximum scalability of write operations.
func NewCounterWithWideness ¶ added in v0.3.0
NewCounterWithWideness creates new instance of Counter with specified wideness. Using this method you can balance scalability and memory overhead.
type Producer ¶
type Producer[T any] struct { // contains filtered or unexported fields }
Producer represents a producer for the concurrent channel. Each producer should be exclusively used by a single goroutine to ensure thread safety. Create separate Producer instance for every goroutine that sends messages.
func (*Producer[T]) Flush ¶
func (p *Producer[T]) Flush()
Flush flushes the items in the buffer to the channel, blocking if necessary. If the channel is full, it blocks until there is space available. Note: This method is intended to be used exclusively by a goroutine that owns this Producer.
func (*Producer[T]) FlushCtx ¶ added in v0.2.0
FlushCtx flushes the items in the buffer to the channel, blocking if necessary. If the channel is full, it blocks until there is space available. It returns error if context gets canceled during flush operation. If the provided context is canceled, the remaining items stay in the buffer. Note: This method is intended to be used exclusively by a goroutine that owns this Producer.
func (*Producer[T]) NonBlockingFlush ¶
NonBlockingFlush attempts to flush the items in the buffer to the channel without blocking. It returns true if the flush was successful, or false if the channel is full. In most cases you should use regular flush method provided to you upon Producer creation. Note: This method is intended to be used exclusively by a goroutine that owns this Producer.
func (*Producer[T]) NonBlockingPut ¶
NonBlockingPut adds an item to the producer's buffer without blocking. If the buffer reaches the batchSize, it attempts a non-blocking flush. It returns true if the item was successfully added, or false if the channel is full. Note: This method is intended to be used exclusively by a goroutine that owns this Producer.
func (*Producer[T]) Put ¶
func (p *Producer[T]) Put(v T)
Put adds an item to the producer's buffer. If the buffer reaches the batchSize, it triggers a flush to the channel. Note: This method is intended to be used exclusively by a goroutine that owns this Producer.
func (*Producer[T]) PutCtx ¶ added in v0.2.0
PutCtx adds an item to the producer's buffer. If the buffer reaches the batchSize, it triggers a flush to the channel. It returns error if context gets canceled during flush operation. Note: This method is intended to be used exclusively by a goroutine that owns this Producer.