thp

package module
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 4, 2023 License: MIT Imports: 5 Imported by: 0

README

thp - High throughput primitives library

Go Reference Build Go Report Card Coverage Status

thp.Chan[T any]

Chan represents a concurrent channel with batching capability. It allows efficient batched communication between producers and consumers, reducing the overhead of individual item transfers.

The channel operates in a concurrent manner, but each producer and consumer should be exclusively used by a single goroutine to ensure thread safety, so create separate Producer[T any] or Consumer[T any] for every goroutine that sends or receives messages. The producer is responsible for adding items to the channel's buffer and flushing them when the batch size is reached. The consumer retrieves items from the channel's buffer and processes them sequentially.

The channel's batch size determines the number of items accumulated in the buffer before a flush operation is triggered. Adjusting the batch size can impact the trade-off between throughput and latency. Smaller batch sizes result in more frequent flushes and lower latency, while larger batch sizes increase throughput at the cost of higher latency. You can also manually trigger flushes.

The channel internally manages a sync.Pool to reuse batch buffers and avoid unnecessary allocations. This optimization improves performance by reducing memory allocations during batch creation and disposal.

Example with comparison to built-in channel:
Built-in channel thp.Chan
ch := make(chan int, 1024)
producersWg := &sync.WaitGroup{}
producersCount := 16
itemsPerProducer := 1_000_000
producersWg.Add(producersCount)

for i := 0; i < producersCount; i++ {
  go func() {
    defer producersWg.Done()
    for j := 0; j < itemsPerProducer; j++ {
      ch <- 1
    }
  }()
}



consumersCount := 16
consumersWg := &sync.WaitGroup{}
consumersWg.Add(consumersCount)
counter := &atomic.Int64{}
for i := 0; i < consumersCount; i++ {
  go func() {
    defer consumersWg.Done()
    result := 0
    for item := range ch {
      result += item
    }
    counter.Add(int64(result))
  }()
}



producersWg.Wait()
close(ch)
consumersWg.Wait()

expectedResult := int64(
  producersCount * itemsPerProducer
)
if counter.Load() != expectedResult {
  t.Errorf(
    "result is not as expected: %v != %v",
    counter.Load(), expectedResult,
  )
}
ch, chCloser := thp.NewChan[int](1024)
producersWg := &sync.WaitGroup{}
producersCount := 16
itemsPerProducer := 1_000_000
producersWg.Add(producersCount)

for i := 0; i < producersCount; i++ {
  go func() {
    defer producersWg.Done()
    producer, flush := ch.Producer()
    defer flush()
    for j := 0; j < itemsPerProducer; j++ {
      producer.Put(1)
    }
  }()
}

consumersCount := 16
consumersWg := &sync.WaitGroup{}
consumersWg.Add(consumersCount)
counter := &atomic.Int64{}
for i := 0; i < consumersCount; i++ {
  go func() {
    defer consumersWg.Done()
    consumer := ch.Consumer()
    result := 0
    item, ok := consumer.Poll()
    for ; ok; item, ok = consumer.Poll() {
      result += item
    }
    counter.Add(int64(result))
  }()
}

producersWg.Wait()
chCloser()
consumersWg.Wait()

expectedResult := int64(
producersCount * itemsPerProducer
)
if counter.Load() != expectedResult {
  t.Errorf(
    "result is not as expected: %v != %v",
    counter.Load(), expectedResult,
  )
}
Performance

Run make chanbench to get results on your machine.

Benchmark results

thp.Counter

Counter is a concurrent counter implementation with striping, designed to enhance performance in write-heavy and contended workloads.

It reduces contention by distributing the workload across multiple internal counters. Compared to the atomic.Int64 type, this counter may use more memory and have a slower Load operation.

However, its Add operations scales better under high load and contention. To balance scalability and memory overhead, you can adjust the level of striping by using the NewCounterWithWideness function and specifying your desired wideness.

NOTE: zero value of Counter is NOT valid, please create new counters using methods provided below.

Example:
counter := thp.NewCounter()
incsPerGoroutine := 1_000_000
wg := &sync.WaitGroup{}
wg.Add(runtime.NumCPU())
for i := 0; i < runtime.NumCPU(); i++ {
    go func() {
        defer wg.Done()
        for j := 0; j < incsPerGoroutine; j++ {
            counter.Add(1)
        }
    }()
}
wg.Wait()
expectedResult := int64(runtime.NumCPU() * incsPerGoroutine)
if counter.Load() != expectedResult {
    t.Errorf("result is not as expected: %v != %v", counter.Load(), expectedResult)
}
Performance

Run make cntbench to get results on your machine. Counter benchmark results

Documentation

Index

Constants

View Source
const ErrChanBatchSize chanError = "Batch size for thp.Chan can't be lower than 1"

Variables

This section is empty.

Functions

This section is empty.

Types

type Chan

type Chan[T any] struct {
	// contains filtered or unexported fields
}

Chan represents a concurrent channel with batching capability. It allows efficient batched communication between producers and consumers, reducing the overhead of individual item transfers.

The channel operates in a concurrent manner, but each producer and consumer should be exclusively used by a single goroutine to ensure thread safety, so create separate Producer[T any] or Consumer[T any] for every goroutine that sends or receives messages. The producer is responsible for adding items to the channel's buffer and flushing them when the batch size is reached. The consumer retrieves items from the channel's buffer and processes them sequentially.

The channel's batch size determines the number of items accumulated in the buffer before a flush operation is triggered. Adjusting the batch size can impact the trade-off between throughput and latency. Smaller batch sizes result in more frequent flushes and lower latency, while larger batch sizes increase throughput at the cost of higher latency. You can also manually trigger flushes.

Context cancellation is supported via separate methods, allowing graceful termination of producers and consumers.

The channel internally manages a sync.Pool to reuse batch buffers and avoid unnecessary allocations. This optimization improves performance by reducing memory allocations during batch creation and disposal.

func NewChan

func NewChan[T any](batchSize int) (*Chan[T], func())

NewChan creates a new concurrent channel. batchSize specifies the number of elements to batch together before sending them. It returns a pointer to Chan[T] and a cleanup function to close the channel.

func (*Chan[T]) Close

func (ch *Chan[T]) Close()

Close closes the concurrent channel. Close panics on attempted close of already close Chan.

func (*Chan[T]) Consumer

func (ch *Chan[T]) Consumer() *Consumer[T]

Consumer creates a consumer for the concurrent channel with the given context. The consumer is responsible for retrieving items from the channel's buffer and processing them sequentially.

Note: This method should be called by the same goroutine that will use the consumer.

Example usage:

consumer := channel.Consumer()
for {
    item, ok := consumer.Poll()
    if !ok {
        break
    }
    // Process the item
}

Returns:

  • consumer: The created consumer instance.

func (*Chan[T]) Producer

func (ch *Chan[T]) Producer() (*Producer[T], func())

Producer creates a producer for the concurrent channel. The producer is responsible for adding items to the channel's buffer and flushing them when the batch size is reached.

Note: flush method should be called by the same goroutine that will use the producer.

Example usage:

producer, flush := channel.Producer()
defer flush() // Ensure sending items through the channel
producer.Put(item1)
producer.Put(item2)

Methods with provided `ctx` allows for graceful termination of the producer. If the context is canceled, the producer stops accepting new items, any remaining items stay in the buffer. WARNING: do not use returned flush method if you want context aware operations, use FlushCtx instead.

Example of ctx aware operations usage:

producer, _ := channel.Producer()
defer producer.FlushCtx(ctx) // Ensure sending items through the channel
err := producer.PutCtx(ctx, item)
if err != nil {
	return err
}

Returns:

  • producer: The created producer instance.
  • flush: A function to send any remaining items.

type Consumer

type Consumer[T any] struct {
	// contains filtered or unexported fields
}

Consumer represents a consumer for the concurrent channel. It retrieves items from the channel's buffer and processes them sequentially.

The consumer operates in a concurrent manner, but it should be exclusively used by a single goroutine to ensure thread safety. Create separate Producer instance for every goroutine that sends messages.

The consumer retrieves items by calling the Poll method, which returns the next item from the buffer. If the buffer is empty, the consumer will prefetch the next batch of items from the internal channel to ensure a continuous supply.

The consumer supports both blocking and non-blocking operations.

Context cancellation is supported via separate method, allowing graceful termination of the consumer. When the consumer's context is canceled, it stops fetching new batches from the internal channel and signals the end of consumption.

func (*Consumer[T]) NonBlockingPoll

func (c *Consumer[T]) NonBlockingPoll() (value T, readSuccess bool, channelIsOpen bool)

NonBlockingPoll retrieves the next item from the consumer's buffer in a non-blocking manner. It returns the item, a boolean indicating whether the retrieval was successful, and a boolean indicating whether the internal channel is still open for further consumption.

Note: This method is intended to be used exclusively by a goroutine that owns this Consumer.

func (*Consumer[T]) Poll

func (c *Consumer[T]) Poll() (T, bool)

Poll retrieves the next item from the consumer's buffer. It returns the item and true if successful, or a zero value and false if there are no more items. Note: This method is intended to be used exclusively by a goroutine that owns this Consumer.

func (*Consumer[T]) PollCtx added in v0.2.0

func (c *Consumer[T]) PollCtx(ctx context.Context) (T, bool, error)

PollCtx retrieves the next item from the consumer's buffer. It returns the item and true if successful, or a (zero value, false, nil) if there are no more items or a (zero value, false, error) if context is canceled. Note: This method is intended to be used exclusively by a goroutine that owns this Consumer.

type Counter added in v0.3.0

type Counter struct {
	// contains filtered or unexported fields
}

Counter is a concurrent counter implementation with striping, designed to enhance performance in write-heavy and contended workloads. It reduces contention by distributing the workload across multiple internal counters. Compared to the atomic.Int64 type, this counter may use more memory and have a slower Load operation. However, its Add operations scales better under high load and contention. To balance scalability and memory overhead, you can adjust the level of striping by using the NewCounterWithWideness function and specifying your desired wideness.

NOTE: zero value of Counter is NOT valid, please create new counters using methods provided below.

func NewCounter added in v0.3.0

func NewCounter() *Counter

NewCounter create new instance of Counter optimised for maximum scalability of write operations.

func NewCounterWithWideness added in v0.3.0

func NewCounterWithWideness(wideness int) *Counter

NewCounterWithWideness creates new instance of Counter with specified wideness. Using this method you can balance scalability and memory overhead.

func (*Counter) Add added in v0.3.0

func (c *Counter) Add(x int64)

Add atomically adds x to current Counter value.

func (*Counter) Clear added in v0.3.0

func (c *Counter) Clear()

Clear sets counter to 0.

func (*Counter) Load added in v0.3.0

func (c *Counter) Load() int64

Load calculates current Counter value, but can omit concurrent updates that happen during Load.

type Producer

type Producer[T any] struct {
	// contains filtered or unexported fields
}

Producer represents a producer for the concurrent channel. Each producer should be exclusively used by a single goroutine to ensure thread safety. Create separate Producer instance for every goroutine that sends messages.

func (*Producer[T]) Flush

func (p *Producer[T]) Flush()

Flush flushes the items in the buffer to the channel, blocking if necessary. If the channel is full, it blocks until there is space available. Note: This method is intended to be used exclusively by a goroutine that owns this Producer.

func (*Producer[T]) FlushCtx added in v0.2.0

func (p *Producer[T]) FlushCtx(ctx context.Context) error

FlushCtx flushes the items in the buffer to the channel, blocking if necessary. If the channel is full, it blocks until there is space available. It returns error if context gets canceled during flush operation. If the provided context is canceled, the remaining items stay in the buffer. Note: This method is intended to be used exclusively by a goroutine that owns this Producer.

func (*Producer[T]) NonBlockingFlush

func (p *Producer[T]) NonBlockingFlush() bool

NonBlockingFlush attempts to flush the items in the buffer to the channel without blocking. It returns true if the flush was successful, or false if the channel is full. In most cases you should use regular flush method provided to you upon Producer creation. Note: This method is intended to be used exclusively by a goroutine that owns this Producer.

func (*Producer[T]) NonBlockingPut

func (p *Producer[T]) NonBlockingPut(v T) bool

NonBlockingPut adds an item to the producer's buffer without blocking. If the buffer reaches the batchSize, it attempts a non-blocking flush. It returns true if the item was successfully added, or false if the channel is full. Note: This method is intended to be used exclusively by a goroutine that owns this Producer.

func (*Producer[T]) Put

func (p *Producer[T]) Put(v T)

Put adds an item to the producer's buffer. If the buffer reaches the batchSize, it triggers a flush to the channel. Note: This method is intended to be used exclusively by a goroutine that owns this Producer.

func (*Producer[T]) PutCtx added in v0.2.0

func (p *Producer[T]) PutCtx(ctx context.Context, v T) error

PutCtx adds an item to the producer's buffer. If the buffer reaches the batchSize, it triggers a flush to the channel. It returns error if context gets canceled during flush operation. Note: This method is intended to be used exclusively by a goroutine that owns this Producer.

Directories

Path Synopsis
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL