xsync

package module
v2.5.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 26, 2023 License: MIT Imports: 10 Imported by: 64

README

GoDoc reference GoReport codecov

xsync

Concurrent data structures for Go. Aims to provide more scalable alternatives for some of the data structures from the standard sync package, but not only.

Covered with tests following the approach described here.

Benchmarks

Benchmark results may be found here. I'd like to thank @felixge who kindly run the benchmarks on a beefy multicore machine.

Also, a non-scientific, unfair benchmark comparing Java's j.u.c.ConcurrentHashMap and xsync.MapOf is available here.

Usage

The latest xsync major version is v2, so /v2 suffix should be used when importing the library:

import (
	"github.com/puzpuzpuz/xsync/v2"
)

Note for v1 users: v1 support is discontinued, so please upgrade to v2. While the API has some breaking changes, the migration should be trivial.

Counter

A Counter is a striped int64 counter inspired by the j.u.c.a.LongAdder class from Java standard library.

c := xsync.NewCounter()
// increment and decrement the counter
c.Inc()
c.Dec()
// read the current value 
v := c.Value()

Works better in comparison with a single atomically updated int64 counter in high contention scenarios.

Map

A Map is like a concurrent hash table based map. It follows the interface of sync.Map with a number of valuable extensions like Compute or Size.

m := xsync.NewMap()
m.Store("foo", "bar")
v, ok := m.Load("foo")
s := m.Size()

Map uses a modified version of Cache-Line Hash Table (CLHT) data structure: https://github.com/LPD-EPFL/CLHT

CLHT is built around idea to organize the hash table in cache-line-sized buckets, so that on all modern CPUs update operations complete with minimal cache-line transfer. Also, Get operations are obstruction-free and involve no writes to shared memory, hence no mutexes or any other sort of locks. Due to this design, in all considered scenarios Map outperforms sync.Map.

One important difference with sync.Map is that only string keys are supported. That's because Golang standard library does not expose the built-in hash functions for interface{} values.

MapOf[K, V] is an implementation with parametrized value type. It is available for Go 1.18 or later. While it's still a CLHT-inspired hash map, MapOf's design is quite different from Map. As a result, less GC pressure and less atomic operations on reads.

m := xsync.NewMapOf[string]()
m.Store("foo", "bar")
v, ok := m.Load("foo")

One important difference with Map is that MapOf supports arbitrary comparable key types:

type Point struct {
	x int32
	y int32
}
m := NewTypedMapOf[Point, int](func(seed maphash.Seed, p Point) uint64 {
	// provide a hash function when creating the MapOf;
	// we recommend using the hash/maphash package for the function
	var h maphash.Hash
	h.SetSeed(seed)
	binary.Write(&h, binary.LittleEndian, p.x)
	hash := h.Sum64()
	h.Reset()
	binary.Write(&h, binary.LittleEndian, p.y)
	return 31*hash + h.Sum64()
})
m.Store(Point{42, 42}, 42)
v, ok := m.Load(point{42, 42})
MPMCQueue

A MPMCQueue is a bounded multi-producer multi-consumer concurrent queue.

q := xsync.NewMPMCQueue(1024)
// producer inserts an item into the queue
q.Enqueue("foo")
// optimistic insertion attempt; doesn't block
inserted := q.TryEnqueue("bar")
// consumer obtains an item from the queue
item := q.Dequeue() // interface{} pointing to a string
// optimistic obtain attempt; doesn't block
item, ok := q.TryDequeue()

MPMCQueueOf[I] is an implementation with parametrized item type. It is available for Go 1.19 or later.

q := xsync.NewMPMCQueueOf[string](1024)
q.Enqueue("foo")
item := q.Dequeue() // string

The queue is based on the algorithm from the MPMCQueue C++ library which in its turn references D.Vyukov's MPMC queue. According to the following classification, the queue is array-based, fails on overflow, provides causal FIFO, has blocking producers and consumers.

The idea of the algorithm is to allow parallelism for concurrent producers and consumers by introducing the notion of tickets, i.e. values of two counters, one per producers/consumers. An atomic increment of one of those counters is the only noticeable contention point in queue operations. The rest of the operation avoids contention on writes thanks to the turn-based read/write access for each of the queue items.

In essence, MPMCQueue is a specialized queue for scenarios where there are multiple concurrent producers and consumers of a single queue running on a large multicore machine.

To get the optimal performance, you may want to set the queue size to be large enough, say, an order of magnitude greater than the number of producers/consumers, to allow producers and consumers to progress with their queue operations in parallel most of the time.

RBMutex

A RBMutex is a reader biased reader/writer mutual exclusion lock. The lock can be held by an many readers or a single writer.

mu := xsync.NewRBMutex()
// reader lock calls return a token
t := mu.RLock()
// the token must be later used to unlock the mutex
mu.RUnlock(t)
// writer locks are the same as in sync.RWMutex
mu.Lock()
mu.Unlock()

RBMutex is based on a modified version of BRAVO (Biased Locking for Reader-Writer Locks) algorithm: https://arxiv.org/pdf/1810.01553.pdf

The idea of the algorithm is to build on top of an existing reader-writer mutex and introduce a fast path for readers. On the fast path, reader lock attempts are sharded over an internal array based on the reader identity (a token in case of Golang). This means that readers do not contend over a single atomic counter like it's done in, say, sync.RWMutex allowing for better scalability in terms of cores.

Hence, by the design RBMutex is a specialized mutex for scenarios, such as caches, where the vast majority of locks are acquired by readers and write lock acquire attempts are infrequent. In such scenarios, RBMutex should perform better than the sync.RWMutex on large multicore machines.

RBMutex extends sync.RWMutex internally and uses it as the "reader bias disabled" fallback, so the same semantics apply. The only noticeable difference is in the reader tokens returned from the RLock/RUnlock methods.

License

Licensed under MIT.

Documentation

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Counter

type Counter struct {
	// contains filtered or unexported fields
}

A Counter is a striped int64 counter.

Should be preferred over a single atomically updated int64 counter in high contention scenarios.

A Counter must not be copied after first use.

func NewCounter added in v2.2.0

func NewCounter() *Counter

NewCounter creates a new Counter instance.

func (*Counter) Add

func (c *Counter) Add(delta int64)

Add adds the delta to the counter.

func (*Counter) Dec

func (c *Counter) Dec()

Dec decrements the counter by 1.

func (*Counter) Inc

func (c *Counter) Inc()

Inc increments the counter by 1.

func (*Counter) Reset

func (c *Counter) Reset()

Reset resets the counter to zero. This method should only be used when it is known that there are no concurrent modifications of the counter.

func (*Counter) Value

func (c *Counter) Value() int64

Value returns the current counter value. The returned value may not include all of the latest operations in presence of concurrent modifications of the counter.

type IntegerConstraint

type IntegerConstraint interface {
	~int | ~int8 | ~int16 | ~int32 | ~int64 | ~uint | ~uint8 | ~uint16 | ~uint32 | ~uint64 | ~uintptr
}

IntegerConstraint represents any integer type.

type MPMCQueue

type MPMCQueue struct {
	// contains filtered or unexported fields
}

A MPMCQueue is a bounded multi-producer multi-consumer concurrent queue.

MPMCQueue instances must be created with NewMPMCQueue function. A MPMCQueue must not be copied after first use.

Based on the data structure from the following C++ library: https://github.com/rigtorp/MPMCQueue

func NewMPMCQueue

func NewMPMCQueue(capacity int) *MPMCQueue

NewMPMCQueue creates a new MPMCQueue instance with the given capacity.

func (*MPMCQueue) Dequeue

func (q *MPMCQueue) Dequeue() interface{}

Dequeue retrieves and removes the item from the head of the queue. Blocks, if the queue is empty.

func (*MPMCQueue) Enqueue

func (q *MPMCQueue) Enqueue(item interface{})

Enqueue inserts the given item into the queue. Blocks, if the queue is full.

func (*MPMCQueue) TryDequeue

func (q *MPMCQueue) TryDequeue() (item interface{}, ok bool)

TryDequeue retrieves and removes the item from the head of the queue. Does not block and returns immediately. The ok result indicates that the queue isn't empty and an item was retrieved.

func (*MPMCQueue) TryEnqueue

func (q *MPMCQueue) TryEnqueue(item interface{}) bool

TryEnqueue inserts the given item into the queue. Does not block and returns immediately. The result indicates that the queue isn't full and the item was inserted.

type MPMCQueueOf added in v2.5.0

type MPMCQueueOf[I any] struct {
	// contains filtered or unexported fields
}

A MPMCQueueOf is a bounded multi-producer multi-consumer concurrent queue. It's a generic version of MPMCQueue.

MPMCQueue instances must be created with NewMPMCQueueOf function. A MPMCQueueOf must not be copied after first use.

Based on the data structure from the following C++ library: https://github.com/rigtorp/MPMCQueue

func NewMPMCQueueOf added in v2.5.0

func NewMPMCQueueOf[I any](capacity int) *MPMCQueueOf[I]

NewMPMCQueueOf creates a new MPMCQueueOf instance with the given capacity.

func (*MPMCQueueOf[I]) Dequeue added in v2.5.0

func (q *MPMCQueueOf[I]) Dequeue() I

Dequeue retrieves and removes the item from the head of the queue. Blocks, if the queue is empty.

func (*MPMCQueueOf[I]) Enqueue added in v2.5.0

func (q *MPMCQueueOf[I]) Enqueue(item I)

Enqueue inserts the given item into the queue. Blocks, if the queue is full.

func (*MPMCQueueOf[I]) TryDequeue added in v2.5.0

func (q *MPMCQueueOf[I]) TryDequeue() (item I, ok bool)

TryDequeue retrieves and removes the item from the head of the queue. Does not block and returns immediately. The ok result indicates that the queue isn't empty and an item was retrieved.

func (*MPMCQueueOf[I]) TryEnqueue added in v2.5.0

func (q *MPMCQueueOf[I]) TryEnqueue(item I) bool

TryEnqueue inserts the given item into the queue. Does not block and returns immediately. The result indicates that the queue isn't full and the item was inserted.

type Map

type Map struct {
	// contains filtered or unexported fields
}

Map is like a Go map[string]interface{} but is safe for concurrent use by multiple goroutines without additional locking or coordination. It follows the interface of sync.Map with a number of valuable extensions like Compute or Size.

A Map must not be copied after first use.

Map uses a modified version of Cache-Line Hash Table (CLHT) data structure: https://github.com/LPD-EPFL/CLHT

CLHT is built around idea to organize the hash table in cache-line-sized buckets, so that on all modern CPUs update operations complete with at most one cache-line transfer. Also, Get operations involve no write to memory, as well as no mutexes or any other sort of locks. Due to this design, in all considered scenarios Map outperforms sync.Map.

One important difference with sync.Map is that only string keys are supported. That's because Golang standard library does not expose the built-in hash functions for interface{} values.

func NewMap

func NewMap() *Map

NewMap creates a new Map instance.

func NewMapPresized added in v2.4.0

func NewMapPresized(sizeHint int) *Map

NewMapPresized creates a new Map instance with capacity enough to hold sizeHint entries. If sizeHint is zero or negative, the value is ignored.

func (*Map) Clear added in v2.1.0

func (m *Map) Clear()

Clear deletes all keys and values currently stored in the map.

func (*Map) Compute added in v2.1.0

func (m *Map) Compute(
	key string,
	valueFn func(oldValue interface{}, loaded bool) (newValue interface{}, delete bool),
) (actual interface{}, ok bool)

Compute either sets the computed new value for the key or deletes the value for the key. When the delete result of the valueFn function is set to true, the value will be deleted, if it exists. When delete is set to false, the value is updated to the newValue. The ok result indicates whether value was computed and stored, thus, is present in the map. The actual result contains the new value in cases where the value was computed and stored. See the example for a few use cases.

func (*Map) Delete

func (m *Map) Delete(key string)

Delete deletes the value for a key.

func (*Map) Load

func (m *Map) Load(key string) (value interface{}, ok bool)

Load returns the value stored in the map for a key, or nil if no value is present. The ok result indicates whether value was found in the map.

func (*Map) LoadAndDelete

func (m *Map) LoadAndDelete(key string) (value interface{}, loaded bool)

LoadAndDelete deletes the value for a key, returning the previous value if any. The loaded result reports whether the key was present.

func (*Map) LoadAndStore

func (m *Map) LoadAndStore(key string, value interface{}) (actual interface{}, loaded bool)

LoadAndStore returns the existing value for the key if present, while setting the new value for the key. It stores the new value and returns the existing one, if present. The loaded result is true if the existing value was loaded, false otherwise.

func (*Map) LoadOrCompute

func (m *Map) LoadOrCompute(key string, valueFn func() interface{}) (actual interface{}, loaded bool)

LoadOrCompute returns the existing value for the key if present. Otherwise, it computes the value using the provided function and returns the computed value. The loaded result is true if the value was loaded, false if stored.

func (*Map) LoadOrStore

func (m *Map) LoadOrStore(key string, value interface{}) (actual interface{}, loaded bool)

LoadOrStore returns the existing value for the key if present. Otherwise, it stores and returns the given value. The loaded result is true if the value was loaded, false if stored.

func (*Map) Range

func (m *Map) Range(f func(key string, value interface{}) bool)

Range calls f sequentially for each key and value present in the map. If f returns false, range stops the iteration.

Range does not necessarily correspond to any consistent snapshot of the Map's contents: no key will be visited more than once, but if the value for any key is stored or deleted concurrently, Range may reflect any mapping for that key from any point during the Range call.

It is safe to modify the map while iterating it. However, the concurrent modification rule apply, i.e. the changes may be not reflected in the subsequently iterated entries.

func (*Map) Size

func (m *Map) Size() int

Size returns current size of the map.

func (*Map) Store

func (m *Map) Store(key string, value interface{})

Store sets the value for a key.

type MapOf

type MapOf[K comparable, V any] struct {
	// contains filtered or unexported fields
}

MapOf is like a Go map[string]V but is safe for concurrent use by multiple goroutines without additional locking or coordination. It follows the interface of sync.Map with a number of valuable extensions like Compute or Size.

A MapOf must not be copied after first use.

MapOf uses a modified version of Cache-Line Hash Table (CLHT) data structure: https://github.com/LPD-EPFL/CLHT

CLHT is built around idea to organize the hash table in cache-line-sized buckets, so that on all modern CPUs update operations complete with at most one cache-line transfer. Also, Get operations involve no write to memory, as well as no mutexes or any other sort of locks. Due to this design, in all considered scenarios MapOf outperforms sync.Map.

func NewIntegerMapOf

func NewIntegerMapOf[K IntegerConstraint, V any]() *MapOf[K, V]

NewIntegerMapOf creates a new MapOf instance with integer typed keys.

func NewIntegerMapOfPresized added in v2.4.0

func NewIntegerMapOfPresized[K IntegerConstraint, V any](sizeHint int) *MapOf[K, V]

NewIntegerMapOfPresized creates a new MapOf instance with integer typed keys and capacity enough to hold sizeHint entries. If sizeHint is zero or negative, the value is ignored.

func NewMapOf

func NewMapOf[V any]() *MapOf[string, V]

NewMapOf creates a new MapOf instance with string keys.

func NewMapOfPresized added in v2.4.0

func NewMapOfPresized[V any](sizeHint int) *MapOf[string, V]

NewMapOfPresized creates a new MapOf instance with string keys and capacity enough to hold sizeHint entries. If sizeHint is zero or negative, the value is ignored.

func NewTypedMapOf

func NewTypedMapOf[K comparable, V any](hasher func(maphash.Seed, K) uint64) *MapOf[K, V]

NewTypedMapOf creates a new MapOf instance with arbitrarily typed keys.

Keys are hashed to uint64 using the hasher function. It is strongly recommended to use the hash/maphash package to implement hasher. See the example for how to do that.

Example
type Person struct {
	GivenName   string
	FamilyName  string
	YearOfBirth int16
}
age := xsync.NewTypedMapOf[Person, int](func(seed maphash.Seed, p Person) uint64 {
	var h maphash.Hash
	h.SetSeed(seed)
	h.WriteString(p.GivenName)
	hash := h.Sum64()
	h.Reset()
	h.WriteString(p.FamilyName)
	hash = 31*hash + h.Sum64()
	h.Reset()
	binary.Write(&h, binary.LittleEndian, p.YearOfBirth)
	return 31*hash + h.Sum64()
})
Y := time.Now().Year()
age.Store(Person{"Ada", "Lovelace", 1815}, Y-1815)
age.Store(Person{"Charles", "Babbage", 1791}, Y-1791)
Output:

func NewTypedMapOfPresized added in v2.4.0

func NewTypedMapOfPresized[K comparable, V any](hasher func(maphash.Seed, K) uint64, sizeHint int) *MapOf[K, V]

NewTypedMapOfPresized creates a new MapOf instance with arbitrarily typed keys and capacity enough to hold sizeHint entries. If sizeHint is zero or negative, the value is ignored.

Keys are hashed to uint64 using the hasher function. It is strongly recommended to use the hash/maphash package to implement hasher. See the example for how to do that.

func (*MapOf[K, V]) Clear added in v2.1.0

func (m *MapOf[K, V]) Clear()

Clear deletes all keys and values currently stored in the map.

func (*MapOf[K, V]) Compute added in v2.1.0

func (m *MapOf[K, V]) Compute(
	key K,
	valueFn func(oldValue V, loaded bool) (newValue V, delete bool),
) (actual V, ok bool)

Compute either sets the computed new value for the key or deletes the value for the key. When the delete result of the valueFn function is set to true, the value will be deleted, if it exists. When delete is set to false, the value is updated to the newValue. The ok result indicates whether value was computed and stored, thus, is present in the map. The actual result contains the new value in cases where the value was computed and stored. See the example for a few use cases.

Example
counts := xsync.NewIntegerMapOf[int, int]()

// Store a new value.
v, ok := counts.Compute(42, func(oldValue int, loaded bool) (newValue int, delete bool) {
	// loaded is false here.
	newValue = 42
	delete = false
	return
})
// v: 42, ok: true
fmt.Printf("v: %v, ok: %v\n", v, ok)

// Update an existing value.
v, ok = counts.Compute(42, func(oldValue int, loaded bool) (newValue int, delete bool) {
	// loaded is true here.
	newValue = oldValue + 42
	delete = false
	return
})
// v: 84, ok: true
fmt.Printf("v: %v, ok: %v\n", v, ok)

// Set a new value or keep the old value conditionally.
var oldVal int
minVal := 63
v, ok = counts.Compute(42, func(oldValue int, loaded bool) (newValue int, delete bool) {
	oldVal = oldValue
	if !loaded || oldValue < minVal {
		newValue = minVal
		delete = false
		return
	}
	newValue = oldValue
	delete = false
	return
})
// v: 84, ok: true, oldVal: 84
fmt.Printf("v: %v, ok: %v, oldVal: %v\n", v, ok, oldVal)

// Delete an existing value.
v, ok = counts.Compute(42, func(oldValue int, loaded bool) (newValue int, delete bool) {
	// loaded is true here.
	delete = true
	return
})
// v: 84, ok: false
fmt.Printf("v: %v, ok: %v\n", v, ok)
Output:

func (*MapOf[K, V]) Delete

func (m *MapOf[K, V]) Delete(key K)

Delete deletes the value for a key.

func (*MapOf[K, V]) Load

func (m *MapOf[K, V]) Load(key K) (value V, ok bool)

Load returns the value stored in the map for a key, or nil if no value is present. The ok result indicates whether value was found in the map.

func (*MapOf[K, V]) LoadAndDelete

func (m *MapOf[K, V]) LoadAndDelete(key K) (value V, loaded bool)

LoadAndDelete deletes the value for a key, returning the previous value if any. The loaded result reports whether the key was present.

func (*MapOf[K, V]) LoadAndStore

func (m *MapOf[K, V]) LoadAndStore(key K, value V) (actual V, loaded bool)

LoadAndStore returns the existing value for the key if present, while setting the new value for the key. It stores the new value and returns the existing one, if present. The loaded result is true if the existing value was loaded, false otherwise.

func (*MapOf[K, V]) LoadOrCompute

func (m *MapOf[K, V]) LoadOrCompute(key K, valueFn func() V) (actual V, loaded bool)

LoadOrCompute returns the existing value for the key if present. Otherwise, it computes the value using the provided function and returns the computed value. The loaded result is true if the value was loaded, false if stored.

func (*MapOf[K, V]) LoadOrStore

func (m *MapOf[K, V]) LoadOrStore(key K, value V) (actual V, loaded bool)

LoadOrStore returns the existing value for the key if present. Otherwise, it stores and returns the given value. The loaded result is true if the value was loaded, false if stored.

func (*MapOf[K, V]) Range

func (m *MapOf[K, V]) Range(f func(key K, value V) bool)

Range calls f sequentially for each key and value present in the map. If f returns false, range stops the iteration.

Range does not necessarily correspond to any consistent snapshot of the Map's contents: no key will be visited more than once, but if the value for any key is stored or deleted concurrently, Range may reflect any mapping for that key from any point during the Range call.

It is safe to modify the map while iterating it. However, the concurrent modification rule apply, i.e. the changes may be not reflected in the subsequently iterated entries.

func (*MapOf[K, V]) Size

func (m *MapOf[K, V]) Size() int

Size returns current size of the map.

func (*MapOf[K, V]) Store

func (m *MapOf[K, V]) Store(key K, value V)

Store sets the value for a key.

type RBMutex

type RBMutex struct {
	// contains filtered or unexported fields
}

A RBMutex is a reader biased reader/writer mutual exclusion lock. The lock can be held by an many readers or a single writer. The zero value for a RBMutex is an unlocked mutex.

A RBMutex must not be copied after first use.

RBMutex is based on a modified version of BRAVO (Biased Locking for Reader-Writer Locks) algorithm: https://arxiv.org/pdf/1810.01553.pdf

RBMutex is a specialized mutex for scenarios, such as caches, where the vast majority of locks are acquired by readers and write lock acquire attempts are infrequent. In such scenarios, RBMutex performs better than sync.RWMutex on large multicore machines.

RBMutex extends sync.RWMutex internally and uses it as the "reader bias disabled" fallback, so the same semantics apply. The only noticeable difference is in reader tokens returned from the RLock/RUnlock methods.

func NewRBMutex added in v2.3.0

func NewRBMutex() *RBMutex

NewRBMutex creates a new RBMutex instance.

func (*RBMutex) Lock

func (mu *RBMutex) Lock()

Lock locks m for writing. If the lock is already locked for reading or writing, Lock blocks until the lock is available.

func (*RBMutex) RLock

func (mu *RBMutex) RLock() *RToken

RLock locks m for reading and returns a reader token. The token must be used in the later RUnlock call.

Should not be used for recursive read locking; a blocked Lock call excludes new readers from acquiring the lock.

func (*RBMutex) RUnlock

func (mu *RBMutex) RUnlock(t *RToken)

RUnlock undoes a single RLock call. A reader token obtained from the RLock call must be provided. RUnlock does not affect other simultaneous readers. A panic is raised if m is not locked for reading on entry to RUnlock.

func (*RBMutex) Unlock

func (mu *RBMutex) Unlock()

Unlock unlocks m for writing. A panic is raised if m is not locked for writing on entry to Unlock.

As with RWMutex, a locked RBMutex is not associated with a particular goroutine. One goroutine may RLock (Lock) a RBMutex and then arrange for another goroutine to RUnlock (Unlock) it.

type RToken

type RToken struct {
	// contains filtered or unexported fields
}

RToken is a reader lock token.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL