concurrency

package module
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 13, 2025 License: MIT Imports: 3 Imported by: 1

README ΒΆ

package concurrency

build go reference last version concurrency coverage concurrency report card

Contents


πŸ“‹ Overview

The concurrency package provides lightweight, efficient concurrency primitives for Go, designed for correctness and performance with minimal memory allocations. It simplifies concurrent programming tasks in Go applications.

For full documentation, see https://pkg.go.dev/github.com/lif0/pkg/concurrency.


βš™οΈ Requirements

  • Go 1.19 or higher

πŸ“¦ Installation

To add this package to your project, use go get:

go get github.com/lif0/pkg/concurrency@latest

Import the reflect extension in your code:

import "github.com/lif0/pkg/concurrency"

✨ Features

Semaphore

The Semaphore type provides a counting semaphore to limit the number of concurrent holders of a shared resource. It supports both limited and unlimited capacity, with methods like Acquire, AcquireContext, TryAcquire, Release, InUse, and Cap.

Example: Limited Semaphore
package main

import (
    "fmt"
    "github.com/lif0/pkg/concurrency"
)

func main() {
    // Create a semaphore with a capacity of 3
    sem := concurrency.NewSemaphore(3)

    // Acquire a slot
    sem.Acquire()
    fmt.Printf("Acquired a slot, in use: %d/%d\\n", sem.InUse(), sem.Cap())
    
    // Perform critical section work
    // ...

    // Release the slot
    sem.Release()
    fmt.Println("Released a slot")
}
Example: Unlimited Semaphore
package main

import (
    "fmt"
    "github.com/lif0/pkg/concurrency"
)

func main() {
    // Create an unlimited semaphore
    sem := concurrency.NewSemaphore(0)

    // Acquire is a no-op for unlimited semaphores
    sem.Acquire()
    fmt.Printf("Acquired (no-op), in use: %d, cap: %d\\n", sem.InUse(), sem.Cap())

    // Perform work
    // ...

    // Release is a no-op for unlimited semaphores
    sem.Release()
    fmt.Println("Released (no-op)")
}
Example: Context-Aware Acquisition
package main

import (
    "context"
    "fmt"
    "time"
    "github.com/lif0/pkg/concurrency"
)

func main() {
    // Create a semaphore with a capacity of 2
    sem := concurrency.NewSemaphore(2)

    // Create a context with a timeout
    ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
    defer cancel()

    // Attempt to acquire a slot with context
    if err := sem.AcquireContext(ctx); err != nil {
        fmt.Printf("Failed to acquire: %v\\n", err)
        return
    }
    fmt.Printf("Acquired a slot with context, in use: %d/%d\\n", sem.InUse(), sem.Cap())

    // Perform work
    // ...

    // Release the slot
    sem.Release()
    fmt.Println("Released a slot")
}
Example: Non-Blocking Acquisition
package main

import (
    "fmt"
    "github.com/lif0/pkg/concurrency"
)

func main() {
    // Create a semaphore with a capacity of 1
    sem := concurrency.NewSemaphore(1)

    // Acquire the only slot
    sem.Acquire()
    fmt.Printf("Acquired a slot, in use: %d/%d\\n", sem.InUse(), sem.Cap())

    // Try to acquire another slot without blocking
    if sem.TryAcquire() {
        fmt.Println("Acquired another slot")
    } else {
        fmt.Println("Failed to acquire: no slots available")
    }

    // Release the slot
    sem.Release()
    fmt.Println("Released a slot")
}
WithLock

WithLock is a helper function that executes an action while holding a lock.
It guarantees that the lock will always be released, even if the action panics.

import (
 "github.com/lif0/pkg/concurrency"
)

func main() {
 var mu sync.Mutex
 counter := 0
 var wg sync.WaitGroup

 for i := 0; i < 5; i++ {
  wg.Add(1)
        wg.Go(func() {
            for j := 0; j < 100; j++ {
    concurrency.WithLock(&mu, func() {
     counter++
    })
   }
        })
 }

 wg.Wait()
 fmt.Println("Final counter:", counter) // Always 500
}
FutureAction

The FutureAction type provides an abstraction over a channel that models a task and its result. It allows executing a computation asynchronously in a goroutine and retrieving the result later via a blocking call. This is similar to the Future pattern in other languages, providing a simple way to handle asynchronous results without manual channel management.

The channel is closed after the result is sent, ensuring proper resource cleanup.

Example: Basic Usage
package main

import (
    "fmt"
    "time"

    "github.com/lif0/pkg/concurrency"
)

func main() {
    callback := func() any {
        time.Sleep(time.Second)
        return "success"
    }

    future := concurrency.NewFutureAction(callback)
    result := future.Get()
    fmt.Println(result) // Output: success
}
Example: Generic Type Usage
package main

import (
    "fmt"
    "time"
    "github.com/lif0/pkg/concurrency"
)

func main() {
    callback := func() int {
        time.Sleep(time.Second)
        return 42
    }

    future := concurrency.NewFutureAction(callback)
    result := future.Get()
    fmt.Printf("Result: %d\n", result) // Output: Result: 42
}
Promise

The Promise type represents a writable, single-assignment container for a future value. It allows setting a value exactly once (subsequent sets are ignored) and provides a Future for reading the value asynchronously. This is similar to the Promise/Future pattern in other languages, enabling clean handling of asynchronous results with thread safety via atomic operations and mutexes.

The internal channel is buffered (capacity 1) and closed after setting the value. Aliases PromiseError and FutureError are provided for error handling.

Example: Basic Usage
package main

import (
    "fmt"
    "time"
    "github.com/lif0/pkg/concurrency"
)

func main() {
    promise := concurrency.NewPromise[string]()
    go func() {
        time.Sleep(time.Second)
        promise.Set("Cake")
    }()

    future := promise.GetFuture()
    value := future.Get()
    fmt.Println(value) // Output: Cake
}
Example: Error Handling with PromiseError
package main

import (
    "errors"
    "fmt"
    "time"
    "github.com/lif0/pkg/concurrency"
)

func main() {
    promise := concurrency.NewPromise[error]()
    go func() {
        time.Sleep(time.Second)
        promise.Set(errors.New("Something went wrong"))
    }()

    future := promise.GetFuture()
    err := future.Get()
    if err != nil {
        fmt.Printf("Error: %v\n", err) // Output: Error: Something went wrong
    }
}
Example: Wrapping Existing Channel with NewFuture
package main

import (
    "fmt"
    "github.com/lif0/pkg/concurrency"
)

func main() {
    ch := make(chan int, 1)
    ch <- 42
    close(ch)

    future := concurrency.NewFuture(ch)
    value := future.Get()
    fmt.Printf("Value: %d\n", value) // Output: Value: 42
}

πŸ—ΊοΈ Roadmap

  • FanIn/FanOut patterns for channel-based concurrency.
  • Future/Promise constructs for asynchronous programming.
  • Michael-Scott Queue (MS Queue) for lock-free concurrent queues.

Contributions and feature suggestions are welcome πŸ€—.


πŸ“„ License

MIT

Documentation ΒΆ

Overview ΒΆ

Package concurrency provides concurrency utilities.

Index ΒΆ

Constants ΒΆ

This section is empty.

Variables ΒΆ

This section is empty.

Functions ΒΆ

func WithLock ΒΆ

func WithLock(mutex sync.Locker, action func())

WithLock executes the given action while holding the provided lock.

It accepts any sync.Locker (e.g., *sync.Mutex, *sync.RWMutex) and a function with no parameters or return values. If the action is nil, nothing is executed.

The lock is guaranteed to be released after the action completes, even if the action panics or returns early.

Types ΒΆ

type Future ΒΆ added in v1.0.1

type Future[T any] struct {
	// contains filtered or unexported fields
}

Future represents a read-only view of a promised value. It provides a way to retrieve the value asynchronously, blocking if necessary.

func NewFuture ΒΆ added in v1.0.1

func NewFuture[T any](result <-chan T) *Future[T]

NewFuture creates a new Future from a given receive-only channel. This allows wrapping an existing channel as a Future.

func (*Future[T]) Get ΒΆ added in v1.0.1

func (f *Future[T]) Get() T

Get retrieves the value from the Future, blocking until it's available. If the channel is closed without a value (though not typical in this pattern), it returns the zero value.

type FutureAction ΒΆ added in v1.0.1

type FutureAction[T any] struct {
	// contains filtered or unexported fields
}

FutureAction is an abstraction over a channel that models a task and its result. It allows executing a computation asynchronously in a goroutine and retrieving the result later via a blocking call. This is similar to the Future pattern in other languages, providing a simple way to handle asynchronous results without manual channel management.

The channel is closed after the result is sent, ensuring proper resource cleanup.

Example usage:

func main() {
	callback := func() any {
		time.Sleep(time.Second)
		return "success"
	}

	future := NewFutureAction(callback)
	result := future.Get()
	fmt.Println(result) // Output: success
}

func NewFutureAction ΒΆ added in v1.0.1

func NewFutureAction[T any](action func() T) *FutureAction[T]

NewFutureAction creates and returns a new FutureAction. It starts the provided action function in a separate goroutine. The action's return value is sent to the internal channel. The channel is closed after sending the result to allow safe ranging or detection of completion.

func (*FutureAction[T]) Get ΒΆ added in v1.0.1

func (f *FutureAction[T]) Get() T

Get returns the result of the asynchronous task. This method blocks until the result is available from the channel. If the action function blocks indefinitely (e.g., due to an infinite loop or deadlock), Get will never return, potentially causing the caller to hang. It is the caller's responsibility to ensure the action completes.

type FutureError ΒΆ added in v1.0.1

type FutureError = Future[error]

PromiseError and FutureError are type aliases for Promise and Future specialized for error handling. This allows for easy propagation of errors in asynchronous operations.

type Promise ΒΆ added in v1.0.1

type Promise[T any] struct {
	// contains filtered or unexported fields
}

Promise represents a writable, single-assignment container for a future value. It allows setting a value exactly once. Attempting to set the value more than once is ignored. The internal channel is buffered to hold one value and is closed after setting. Synchronization is handled via atomic operations and a mutex for thread safety.

Example usage:

func main() {
	promise := NewPromise[string]()
	go func() {
		time.Sleep(time.Second)
		promise.Set("Cake")
	}()

    future := promise.GetFuture()
    value := future.Get()
    fmt.Println(value) // Output: Cake
}

func NewPromise ΒΆ added in v1.0.1

func NewPromise[T any]() Promise[T]

NewPromise creates and returns a new Promise. The internal channel is buffered with capacity 1 to hold the future value.

func (*Promise[T]) GetFuture ΒΆ added in v1.0.1

func (p *Promise[T]) GetFuture() *Future[T]

GetFuture returns a Future associated with this Promise. The Future can be used to retrieve the value once it's set.

func (*Promise[T]) Set ΒΆ added in v1.0.1

func (p *Promise[T]) Set(value T)

Set assigns the value to the Promise. This can be called only once; subsequent calls are ignored. After setting, the value is sent to the channel, and the channel is closed.

type PromiseError ΒΆ added in v1.0.1

type PromiseError = Promise[error]

PromiseError and FutureError are type aliases for Promise and Future specialized for error handling. This allows for easy propagation of errors in asynchronous operations.

type Semaphore ΒΆ

type Semaphore struct {
	// contains filtered or unexported fields
}

Semaphore is a counting semaphore that bounds the number of concurrent holders.

The zero value (and a nil *Semaphore) is an unlimited semaphore: all acquire operations succeed immediately and Release is a no-op.

All methods are safe for concurrent use by multiple goroutines.

func NewSemaphore ΒΆ

func NewSemaphore(capacity uint) *Semaphore

NewSemaphore returns a semaphore with the provided capacity.

If capacity <= 0, it returns an unlimited semaphore, for which all acquire operations succeed immediately and Release does nothing.

func (*Semaphore) Acquire ΒΆ

func (s *Semaphore) Acquire()

Acquire obtains one slot from s, blocking until a slot is available. For an unlimited semaphore, Acquire is a no-op.

func (*Semaphore) AcquireContext ΒΆ

func (s *Semaphore) AcquireContext(ctx context.Context) error

AcquireContext attempts to obtain one slot, blocking until a slot is available or the context is canceled or its deadline is exceeded. It returns ctx.Err() if the context is done first. For an unlimited semaphore, AcquireContext returns nil immediately.

func (*Semaphore) Cap ΒΆ

func (s *Semaphore) Cap() int

Cap returns the maximum number of concurrent holders (the capacity). For an unlimited semaphore, Cap returns 0.

func (*Semaphore) InUse ΒΆ

func (s *Semaphore) InUse() int

InUse reports the current number of acquired slots. For an unlimited semaphore, InUse returns 0.

func (*Semaphore) Release ΒΆ

func (s *Semaphore) Release()

Release releases one previously acquired slot. On a limited semaphore, calling Release without a matching acquire panics. On an unlimited semaphore, Release is a no-op.

func (*Semaphore) TryAcquire ΒΆ

func (s *Semaphore) TryAcquire() bool

TryAcquire attempts to obtain one slot without blocking. It returns true if a slot was acquired and false otherwise. For an unlimited semaphore, TryAcquire always returns true.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL