Documentation
¶
Index ¶
- Variables
- type Logger
- type Metric
- type Option
- func WithAfterFn(afterFn func()) Option
- func WithFn(fn func(context.Context, core.TaskMessage) error) Option
- func WithLogger(l Logger) Option
- func WithMetric(m Metric) Option
- func WithQueueSize(num int) Option
- func WithRetryInterval(d time.Duration) Option
- func WithWorker(w core.Worker) Option
- func WithWorkerCount(num int64) Option
- type OptionFunc
- type Options
- type Queue
- func (q *Queue) BusyWorkers() int64
- func (q *Queue) CompletedTasks() uint64
- func (q *Queue) FailureTasks() uint64
- func (q *Queue) Queue(message core.QueuedMessage, opts ...job.AllowOption) error
- func (q *Queue) QueueTask(task job.TaskFunc, opts ...job.AllowOption) error
- func (q *Queue) Release()
- func (q *Queue) Shutdown()
- func (q *Queue) Start()
- func (q *Queue) SubmittedTasks() uint64
- func (q *Queue) SuccessTasks() uint64
- func (q *Queue) UpdateWorkerCount(num int64)
- func (q *Queue) Wait()
- type Ring
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ( // ErrNoTaskInQueue is returned by Worker.Request() when the queue is currently empty. // This is a temporary condition - new tasks may be added later. // The queue scheduler uses this error to determine when to retry or wait for notifications. ErrNoTaskInQueue = errors.New("golang-queue: no task in queue") // ErrQueueHasBeenClosed is returned by Worker.Request() during/after shutdown when no tasks remain. // This is a terminal state indicating the queue has been shut down and drained. // Once this error appears, no new tasks will be processed. // Triggered by: calling Shutdown() or Release() on the queue. ErrQueueHasBeenClosed = errors.New("golang-queue: queue has been closed") // ErrMaxCapacity is returned by Queue() or QueueTask() when the queue is at maximum capacity. // This only occurs when WithQueueSize() is used to set a capacity limit. // To handle: retry later, drop the task, or process it synchronously. // Triggered by: attempting to enqueue when count >= capacity. ErrMaxCapacity = errors.New("golang-queue: maximum size limit reached") )
var ErrMissingWorker = errors.New("missing worker module")
ErrMissingWorker is returned when a queue is created without a worker implementation.
var ErrQueueShutdown = errors.New("queue has been closed and released")
ErrQueueShutdown is returned when an operation is attempted on a queue that has already been closed and released.
Functions ¶
This section is empty.
Types ¶
type Logger ¶
type Logger interface {
// Infof logs formatted informational messages.
Infof(format string, args ...any)
// Errorf logs formatted error messages.
Errorf(format string, args ...any)
// Fatalf logs formatted fatal errors with stack trace information.
// Used for panics and critical failures.
Fatalf(format string, args ...any)
// Info logs informational messages.
Info(args ...any)
// Error logs error messages.
Error(args ...any)
// Fatal logs fatal errors with stack trace information.
// Used for panics and critical failures.
Fatal(args ...any)
}
Logger defines the interface for logging queue events, errors, and fatal conditions. The queue uses this interface to report:
- Info: Normal operations (shutdown, retry attempts)
- Error: Recoverable errors (task failures, runtime errors)
- Fatal: Panics and critical failures (includes stack traces)
Implement this interface to integrate with custom logging systems (logrus, zap, etc.).
func NewEmptyLogger ¶
func NewEmptyLogger() Logger
NewEmptyLogger creates a no-op logger that discards all log messages. This is useful for:
- Performance-sensitive production environments where logging overhead matters
- Testing scenarios where log output would clutter test results
- Silent background workers that don't need observability
Example:
q := queue.NewPool(5, queue.WithLogger(queue.NewEmptyLogger()))
Example ¶
l := NewEmptyLogger()
l.Info("test")
l.Infof("test")
l.Error("test")
l.Errorf("test")
l.Fatal("test")
l.Fatalf("test")
func NewLogger ¶
func NewLogger() Logger
NewLogger creates a standard logger that writes to stderr with timestamps. This is the default logger used by queues unless overridden with WithLogger.
Log format:
- INFO messages: Simple timestamped output
- ERROR messages: Simple timestamped output
- FATAL messages: Includes stack trace with file:line information
Use cases:
- Development and debugging
- Simple production deployments without structured logging
- When detailed error context is needed
type Metric ¶ added in v0.0.10
type Metric interface {
// IncBusyWorker increments the count of workers currently processing tasks.
// Called atomically when a worker starts processing a job.
IncBusyWorker()
// DecBusyWorker decrements the count of workers currently processing tasks.
// Called atomically when a worker finishes processing a job (success or failure).
DecBusyWorker()
// BusyWorkers returns the current number of workers actively processing tasks.
// This value can range from 0 to the configured workerCount.
BusyWorkers() int64
// SuccessTasks returns the total number of tasks that completed successfully.
// A task is considered successful if it returns no error and doesn't panic.
SuccessTasks() uint64
// FailureTasks returns the total number of tasks that failed.
// A task is considered failed if it returns an error, panics, or times out.
FailureTasks() uint64
// SubmittedTasks returns the total number of tasks submitted to the queue.
// This includes tasks still pending, in progress, and completed.
SubmittedTasks() uint64
// CompletedTasks returns the total number of tasks that have finished processing.
// This equals SuccessTasks() + FailureTasks().
CompletedTasks() uint64
// IncSuccessTask increments the successful task counter.
// Called atomically after a task completes without error.
IncSuccessTask()
// IncFailureTask increments the failed task counter.
// Called atomically after a task fails, panics, or times out.
IncFailureTask()
// IncSubmittedTask increments the submitted task counter.
// Called atomically when a new task is queued.
IncSubmittedTask()
}
Metric defines the interface for tracking queue performance and worker statistics. All methods must be safe for concurrent access from multiple goroutines. Implement this interface to integrate with custom monitoring systems (Prometheus, StatsD, etc.).
type Option ¶
type Option interface {
// contains filtered or unexported methods
}
Option is a functional option for configuring a Queue. It follows the functional options pattern for flexible and extensible configuration.
func WithAfterFn ¶ added in v0.2.1
func WithAfterFn(afterFn func()) Option
WithAfterFn sets a callback function that will be executed after each job completes. This callback runs regardless of whether the job succeeded or failed. It executes after metrics are updated but before the worker picks up the next task. Useful for cleanup, logging, or triggering post-processing workflows.
Example:
q := NewPool(5, WithAfterFn(func() {
log.Println("Job completed")
}))
func WithFn ¶ added in v0.0.7
WithFn sets a custom handler function that will be called to process tasks. This function is used by the worker's Run method when processing job messages. The context allows cancellation and timeout control during task execution. If not set, defaults to a no-op function that returns nil.
Example:
handler := func(ctx context.Context, msg core.TaskMessage) error {
// Process the message
return processTask(msg)
}
q := NewPool(5, WithFn(handler))
func WithLogger ¶
WithLogger sets a custom logger for queue events and errors. By default, the queue uses a standard logger that writes to stderr. Use NewEmptyLogger() to disable logging entirely.
Example:
q := NewPool(5, WithLogger(myCustomLogger)) // or disable logging: q := NewPool(5, WithLogger(NewEmptyLogger()))
func WithMetric ¶ added in v0.0.10
WithMetric sets a custom metrics collector for tracking queue statistics. The default metric tracks busy workers, success/failure counts, and submitted tasks. Implement the Metric interface to integrate with custom monitoring systems.
Example:
q := NewPool(5, WithMetric(myPrometheusMetric))
func WithQueueSize ¶ added in v0.0.7
WithQueueSize sets the maximum capacity of the queue. When set to 0 (default), the queue has unlimited capacity and will grow dynamically. When set to a positive value, Queue() will return ErrMaxCapacity when the limit is reached. Use this to prevent memory exhaustion under high load.
Example:
q := NewPool(5, WithQueueSize(1000)) // Queue will hold at most 1000 pending tasks
func WithRetryInterval ¶ added in v0.4.0
WithRetryInterval sets the interval at which the queue polls for new tasks when the queue is empty. This determines how often Request() is retried after receiving ErrNoTaskInQueue. Lower values provide faster response to new tasks but increase CPU usage. Defaults to 1 second.
Example:
q := NewPool(5, WithRetryInterval(100*time.Millisecond)) // Poll every 100ms
func WithWorker ¶
WithWorker sets a custom worker implementation for the queue backend. By default, NewPool uses an in-memory Ring buffer worker. Use this to integrate external queue systems like NSQ, NATS, Redis, or RabbitMQ. This option is required when using NewQueue() instead of NewPool().
Example:
q, _ := NewQueue(WithWorker(myNSQWorker), WithWorkerCount(10))
func WithWorkerCount ¶
WithWorkerCount sets the number of concurrent worker goroutines that will process jobs. If num is less than or equal to 0, it defaults to runtime.NumCPU(). More workers allow higher concurrency but consume more system resources.
Example:
q := NewPool(10, WithWorkerCount(4)) // Creates a pool with 4 workers
type OptionFunc ¶ added in v0.1.0
type OptionFunc func(*Options)
OptionFunc is a function adapter that implements the Option interface. It allows regular functions to be used as Options.
type Options ¶ added in v0.0.7
type Options struct {
// contains filtered or unexported fields
}
Options holds the configuration parameters for a Queue. Use the With* functions to configure these options when creating a queue.
func NewOptions ¶ added in v0.0.7
NewOptions creates an Options struct with default values and applies any provided options. Default values:
- workerCount: runtime.NumCPU()
- queueSize: 0 (unlimited)
- logger: stderr logger with timestamps
- worker: nil (must be provided via WithWorker or use NewPool which sets Ring)
- fn: no-op function returning nil
- metric: built-in metric tracker
- retryInterval: 1 second
type Queue ¶
type Queue struct {
sync.Mutex // Mutex to protect concurrent access to queue state
// contains filtered or unexported fields
}
Queue represents a message queue with worker management, job scheduling, retry logic, and graceful shutdown capabilities.
func NewPool ¶ added in v0.0.7
NewPool creates a ready-to-use in-memory queue with the Ring buffer worker. This is the recommended way to create a queue for most use cases.
Key differences from NewQueue:
- Automatically creates and attaches a Ring buffer worker (no need for WithWorker)
- Calls Start() automatically so the queue begins processing immediately
- Panics on error instead of returning an error (simplifies initialization)
Parameters:
- size: Number of worker goroutines (if <= 0, defaults to runtime.NumCPU())
- opts: Additional options to customize the queue (WithLogger, WithQueueSize, etc.)
Example:
// Create a pool with 5 workers and custom capacity
q := queue.NewPool(5, queue.WithQueueSize(100))
defer q.Release()
// Queue tasks
q.QueueTask(func(ctx context.Context) error {
// Process task
return nil
})
Use NewQueue instead if you need:
- Custom worker implementations (NSQ, NATS, Redis, etc.)
- Manual control over when to start the queue
- Error handling during queue creation
Example (QueueTask) ¶
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/golang-queue/queue"
)
func main() {
taskN := 7
rets := make(chan int, taskN)
// allocate a pool with 5 goroutines to deal with those tasks
p := queue.NewPool(5)
// don't forget to release the pool in the end
defer p.Release()
// assign tasks to asynchronous goroutine pool
for i := 0; i < taskN; i++ {
idx := i
if err := p.QueueTask(func(context.Context) error {
// sleep and return the index
time.Sleep(20 * time.Millisecond)
rets <- idx
return nil
}); err != nil {
log.Println(err)
}
}
// wait until all tasks done
for i := 0; i < taskN; i++ {
fmt.Println("index:", <-rets)
}
}
Output: index: 3 index: 0 index: 2 index: 4 index: 5 index: 6 index: 1
Example (QueueTaskTimeout) ¶
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/golang-queue/queue"
"github.com/golang-queue/queue/job"
)
func main() {
taskN := 7
rets := make(chan int, taskN)
resps := make(chan error, 1)
// allocate a pool with 5 goroutines to deal with those tasks
q := queue.NewPool(5)
// don't forget to release the pool in the end
defer q.Release()
// assign tasks to asynchronous goroutine pool
for i := 0; i < taskN; i++ {
idx := i
if err := q.QueueTask(func(ctx context.Context) error {
// panic job
if idx == 5 {
panic("system error")
}
// timeout job
if idx == 6 {
time.Sleep(105 * time.Millisecond)
}
select {
case <-ctx.Done():
resps <- ctx.Err()
default:
}
rets <- idx
return nil
}, job.AllowOption{
Timeout: job.Time(100 * time.Millisecond),
}); err != nil {
log.Println(err)
}
}
// wait until all tasks done
for i := 0; i < taskN-1; i++ {
fmt.Println("index:", <-rets)
}
close(resps)
for e := range resps {
fmt.Println(e.Error())
}
fmt.Println("success task count:", q.SuccessTasks())
fmt.Println("failure task count:", q.FailureTasks())
fmt.Println("submitted task count:", q.SubmittedTasks())
}
Output: index: 3 index: 0 index: 2 index: 4 index: 6 index: 1 context deadline exceeded success task count: 5 failure task count: 2 submitted task count: 7
func NewQueue ¶
NewQueue creates and returns a new Queue instance with the provided options. Returns an error if no worker is specified.
func (*Queue) BusyWorkers ¶ added in v0.1.0
BusyWorkers returns the number of workers currently processing jobs.
func (*Queue) CompletedTasks ¶ added in v0.2.1
CompletedTasks returns the total number of completed tasks (success + failure).
func (*Queue) FailureTasks ¶ added in v0.1.0
FailureTasks returns the number of failed tasks.
func (*Queue) Queue ¶
func (q *Queue) Queue(message core.QueuedMessage, opts ...job.AllowOption) error
Queue enqueues a single job (core.QueuedMessage) into the queue. Accepts job options for customization.
func (*Queue) QueueTask ¶
QueueTask enqueues a single task function into the queue. Accepts job options for customization.
func (*Queue) Release ¶ added in v0.0.7
func (q *Queue) Release()
Release performs a graceful shutdown and waits for all goroutines to finish.
func (*Queue) Shutdown ¶
func (q *Queue) Shutdown()
Shutdown initiates a graceful shutdown of the queue. It signals all goroutines to stop, shuts down the worker, and closes the quit channel. Shutdown is idempotent and safe to call multiple times.
func (*Queue) Start ¶
func (q *Queue) Start()
Start launches all worker goroutines and begins processing jobs. If workerCount is zero, Start is a no-op.
func (*Queue) SubmittedTasks ¶ added in v0.1.0
SubmittedTasks returns the number of tasks submitted to the queue.
func (*Queue) SuccessTasks ¶ added in v0.1.0
SuccessTasks returns the number of successfully completed tasks.
func (*Queue) UpdateWorkerCount ¶ added in v0.1.0
UpdateWorkerCount dynamically updates the number of worker goroutines. Triggers scheduling to adjust to the new worker count.
type Ring ¶ added in v0.2.0
Ring is an in-memory worker implementation using a dynamic circular buffer. It implements the core.Worker interface and provides automatic resizing:
- Doubles capacity when full
- Halves capacity when less than 25% utilized
The ring buffer uses two pointers (head and tail) to track the queue boundaries:
- head: points to the next task to dequeue
- tail: points to the next empty slot for enqueuing
- When head == tail, the queue is empty
- Both pointers wrap around using modulo arithmetic
func NewRing ¶ added in v0.2.0
NewRing creates a new Ring instance with the provided options. It initializes the task queue with a default size of 2, sets the capacity based on the provided options, and configures the logger and run function. The function returns a pointer to the newly created Ring instance.
Parameters:
opts - A variadic list of Option functions to configure the Ring instance.
Returns:
*Ring - A pointer to the newly created Ring instance.
func (*Ring) Queue ¶ added in v0.2.0
func (s *Ring) Queue(task core.TaskMessage) error
Queue adds a task to the ring buffer. The buffer grows dynamically (doubles in size) when full, unless capacity is set. Returns ErrQueueShutdown if the queue is closing, or ErrMaxCapacity if at the size limit.
Thread-safety: This method is safe for concurrent calls.
func (*Ring) Request ¶ added in v0.2.0
func (s *Ring) Request() (core.TaskMessage, error)
Request dequeues and returns the next task from the ring buffer. The buffer shrinks automatically (halves in size) when less than 25% full. Returns:
- (task, nil) if a task is successfully dequeued
- (nil, ErrNoTaskInQueue) if the queue is currently empty
- (nil, ErrQueueHasBeenClosed) if shutdown is complete and the queue is empty
During shutdown, this method signals the exit channel when the last task is dequeued, allowing Shutdown() to complete.
Thread-safety: This method is safe for concurrent calls.
func (*Ring) Run ¶ added in v0.2.0
Run executes a new task using the provided context and task message. It calls the runFunc function, which is responsible for processing the task. The context allows for cancellation and timeout control of the task execution.
func (*Ring) Shutdown ¶ added in v0.2.0
Shutdown gracefully shuts down the worker. It sets the stopFlag to indicate that the queue is shutting down and prevents new tasks from being added. If the queue is already shut down, it returns ErrQueueShutdown. It waits for all tasks to be processed before completing the shutdown.