queue

package module
v0.0.0-...-62f47ba Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 26, 2024 License: MIT Imports: 4 Imported by: 0

README

go-lockfree-queue

Purpose

This repository contains an educational implementation of Michael and Scott's non-blocking concurrent queue algorithm in Go. The primary goal is to provide a clear, well-documented example of how to implement a lock-free data structure using Go's concurrency primitives.

Overview

The non-blocking concurrent queue algorithm, introduced by Maged M. Michael and Michael L. Scott in their 1996 paper, allows multiple threads to enqueue and dequeue items simultaneously without using locks. This implementation achieves wait-free progress for enqueue operations and lock-free progress for dequeue operations.

Key Features
  1. Lock-Free Operations: Both enqueue and dequeue operations are implemented without using locks, allowing for high concurrency.
  2. ABA Problem Mitigation: Instead of using counters as suggested in the original paper, this implementation uses hazard pointers as a workaround for the ABA problem.
Example Usage

The following example demonstrates how to use the go-lockfree-queue library to create a queue and perform basic enqueue and dequeue operations:

// Default configuration.
q1 := queue.New[string]()

// Custom configuration.
q2 := queue.New[string](
    queue.WithReclaimInterval(100 * time.Millisecond), // More frequent reclamation
)
Available Options
  1. WithReclaimInterval(d time.Duration): Sets how often the queue attempts to reclaim nodes.
    • Default: 5 seconds
    • Use when:
      • High throughput: Decrease interval to reclaim nodes more frequently
      • Low throughput: Increase interval to reduce CPU overhead
    • Example: queue.WithReclaimInterval(100 * time.Millisecond) for high-throughput scenarios
Key Operations
Enqueue Operation
  1. Get a node from the free list and set its value.
  2. Use Compare-And-Swap (CAS) to append the new node to the tail of the queue.
  3. If necessary, update the tail pointer to the newly added node.
Dequeue Operation
  1. Check if the queue is empty (head's next is null).
  2. Use CAS to remove the first non-dummy node from the head of the queue.
  3. Update the head to point to the new first node.
  4. Return the value from the dequeued node and return the node to the free list.
Memory Management
  • Pre-allocates a fixed number of nodes to avoid dynamic allocation during queue operations.
  • Manages a free list of nodes for reuse, reducing garbage collection pressure.

Limitations

  • Fixed maximum capacity due to pre-allocated node pool.
  • Intended for educational purposes and may not be suitable for production use without further testing and optimization.

References

Documentation

Overview

Package queue implements a lock-free concurrent FIFO queue using pre-allocated nodes. The queue is designed for high-performance concurrent access without locks, making it suitable for multi-producer, multi-consumer scenarios.

The queue pre-allocates a fixed number of nodes (65536 by default) for better memory locality and reduced allocation overhead. When the queue is full, attempting to enqueue will panic.

Example usage:

func Example() {
	// Create a queue with default options
	q1 := queue.New[string]()
	defer q1.Close()

	// Create a queue with custom options
	q2 := queue.New[string](
		queue.WithMaxNodes(1<<20),              // Set max nodes to 1 million
		queue.WithReclaimInterval(time.Second), // Reclaim nodes every second
	)
	defer q2.Close()

	// Basic operations remain the same
	q2.Enqueue("first")
	q2.Enqueue("second")

	if val, ok := q2.Dequeue(); ok {
		fmt.Println(val) // Prints: first
	}

	if !q2.Empty() {
		val, _ := q2.Dequeue()
		fmt.Println(val) // Prints: second
	}
}

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Node

type Node[T any] struct {
	// contains filtered or unexported fields
}

Node represents a node in the queue.

type Queue

type Queue[T any] struct {
	// contains filtered or unexported fields
}

Queue represents a concurrent FIFO queue with pre-allocated nodes.

func New

func New[T any](opts ...QueueOption[T]) *Queue[T]

New creates a new empty queue with pre-allocated nodes.

func (*Queue[T]) Dequeue

func (q *Queue[T]) Dequeue() (T, bool)

Dequeue removes and returns a value from the head of the queue.

func (*Queue[T]) Empty

func (q *Queue[T]) Empty() bool

Empty returns true if the queue is empty.

func (*Queue[T]) Enqueue

func (q *Queue[T]) Enqueue(value T)

Enqueue adds a value to the tail of the queue.

type QueueOption

type QueueOption[T any] func(*Queue[T])

QueueOption is a functional option for configuring a Queue.

func WithReclaimInterval

func WithReclaimInterval[T any](interval time.Duration) QueueOption[T]

WithReclaimInterval sets the interval between reclamation runs.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL