asyncq

package module
v1.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 4, 2026 License: MIT Imports: 3 Imported by: 0

README

AsyncQ

Go Report Card Go Doc

Simple and Fast Asynchronous Queue and Event Loop in GO.
Implementations

The package exposes an interface AsyncQ and one implementation AsyncDoubleQueue.

AsyncDoubleQueue uses two queues, one in for enqueue operations (inputQueue) and another one for dequeues operations (outputQueue).

  • A single mutex is used to lock only the inputQueue. The event loop acquire the lock only in one occasion: when outputQueue is empty and exchanges the two queues;
  • Use a channel to put the event loop to sleep or to wake-up it;
  • The event loop is panic safe;
  • A nil task closes the event loop (but not the channel, so it's safe to only use Close).
Example
package main

import (
    "fmt"
    "log"
    "os"
	
    "github.com/ggpslop/asyncq"
)

func example_deferred_close() {

	var queue asyncq.AsyncQ
	queue = asyncq.NewAsyncDoubleQueue(10, log.New(os.Stdout, "", 0))

	go queue.RunEventLoop()
	defer queue.Close()

	var word = "world"
	queue.Enqueue(func() {
		fmt.Printf("Hello, %s!\n", word) // expected "Hello, world!"
	})
}

func example_close_plus_wait() {

	var queue asyncq.AsyncQ
	queue = asyncq.NewAsyncDoubleQueue(10, log.New(os.Stdout, "", 0))
	go queue.RunEventLoop()

	var number int // escapes to heap.
	queue.Enqueue(func() {
		number++
	})

	var wait = queue.Close()
	wait()

	fmt.Printf("The number is %d\n", number) // expected 2
}

func example_deferred_close_plus_wait() {

	var queue asyncq.AsyncQ
	queue = asyncq.NewAsyncDoubleQueue(10, log.New(os.Stdout, "", 0))
	go queue.RunEventLoop()

	defer func() {
		var wait = queue.Close()
		wait()
	}()

	var word = "world"
	queue.Enqueue(func() {
		fmt.Printf("Hello, %s!\n", word) // expected "Hello, world!"
	})
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AsyncDoubleQueue

type AsyncDoubleQueue struct {
	// contains filtered or unexported fields
}

AsyncDoubleQueue uses two queues, one in for enqueue operations (inputQueue) and another one for dequeues operations (outputQueue).

  • A single mutex is used to lock only the inputQueue. The event loop acquire the lock only in one occasion: when outputQueue is empty and exchanges the two queues.
  • Use a channel to put the event loop to sleep or to wake-up it.
  • The event loop is 'panic' safe.
  • A nil task closes the event loop (but not the channel, so it's safe to only use Close).

func NewAsyncDoubleQueue

func NewAsyncDoubleQueue(initCap int, logger *log.Logger) *AsyncDoubleQueue

NewAsyncDoubleQueue returns a new heap allocated AsyncDoubleQueue.

  • If 'initCap' is less the 'defaultCap', 'defaultCap' is used as initial capacity.
  • If 'logger' is nil, plain STDOUT is used.

func (*AsyncDoubleQueue) Close

func (aq *AsyncDoubleQueue) Close() WaitFunc

Close the event loop and the channel. It uses sync.WaitGroup with counter = 1 to create a 'wait' function, that it then returns. To close the event loop, it adds 2 task to the inputQueue, a task with sync.WaitGroup.Done and a nil task that is recognized by the event loop logic.

func (*AsyncDoubleQueue) Enqueue

func (aq *AsyncDoubleQueue) Enqueue(task func())

func (*AsyncDoubleQueue) RunEventLoop

func (aq *AsyncDoubleQueue) RunEventLoop()

func (*AsyncDoubleQueue) TryEnqueue

func (aq *AsyncDoubleQueue) TryEnqueue(task func()) bool

TryEnqueue returns 'true' when the enqueue is done, 'false' otherwise.

type AsyncQ

type AsyncQ interface {

	// Enqueue a new task in queue.
	Enqueue(task func())

	// TryEnqueue enqueue a new task in queue if it's possible. The 'possibility'
	// must be decided by the specific implementation. For example, can be
	// useful when a lock mechanism is used, like a mutex.
	TryEnqueue(task func()) bool

	// Close the queue and no more tasks can be enqueued. It returns a wait
	// function to be used to graceful shutting down the queue.
	Close() WaitFunc

	// RunEventLoop start the event loop. It must be started in a new goroutine.
	RunEventLoop()
}

AsyncQ is an interface for any AsyncQ implementation. Allows queuing, closing the queue, and starting the Event Loop. Tasks are simply parameterless procedures, because you can use closures that capture external context variables.

type Mutex

type Mutex interface {
	Lock()
	TryLock() bool
	Unlock()
}

type WaitFunc added in v1.1.1

type WaitFunc func()

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL