queue

package module
v0.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 11, 2026 License: MIT Imports: 14 Imported by: 49

README

Queue

CodeQL Run Tests codecov

繁體中文 | 简体中文

Queue is a Golang library designed to help you create and manage a pool of Goroutines (lightweight threads). It allows you to efficiently run multiple tasks in parallel, utilizing the full CPU capacity of your machine.

Features

Queue Scenario

A simple queue service using a ring buffer as the default backend.

queue01

Easily switch the queue service to use NSQ, NATS, or Redis.

queue02

Supports multiple producers and consumers.

queue03

Requirements

Go version 1.22 or above

Installation

To install the stable version:

go get github.com/golang-queue/queue

To install the latest version:

go get github.com/golang-queue/queue@master

Usage

Basic Usage of Pool (using the Task function)

By calling the QueueTask() method, you can schedule tasks to be executed by workers (Goroutines) in the pool.

package main

import (
  "context"
  "fmt"
  "time"

  "github.com/golang-queue/queue"
)

func main() {
  taskN := 100
  rets := make(chan string, taskN)

  // initialize the queue pool
  q := queue.NewPool(5)
  // shut down the service and notify all workers
  // wait until all jobs are complete
  defer q.Release()

  // assign tasks to the queue
  for i := 0; i < taskN; i++ {
    go func(i int) {
      if err := q.QueueTask(func(ctx context.Context) error {
        rets <- fmt.Sprintf("Hi Gopher, handle the job: %02d", +i)
        return nil
      }); err != nil {
        panic(err)
      }
    }(i)
  }

  // wait until all tasks are done
  for i := 0; i < taskN; i++ {
    fmt.Println("message:", <-rets)
    time.Sleep(20 * time.Millisecond)
  }
}
Basic Usage of Pool (using a message queue)

Define a new message struct and implement the Bytes() function to encode the message. Use the WithFn function to handle messages from the queue.

package main

import (
  "context"
  "encoding/json"
  "fmt"
  "log"
  "time"

  "github.com/golang-queue/queue"
  "github.com/golang-queue/queue/core"
)

type job struct {
  Name    string
  Message string
}

func (j *job) Bytes() []byte {
  b, err := json.Marshal(j)
  if err != nil {
    panic(err)
  }
  return b
}

func main() {
  taskN := 100
  rets := make(chan string, taskN)

  // initialize the queue pool
  q := queue.NewPool(5, queue.WithFn(func(ctx context.Context, m core.TaskMessage) error {
    var v job
    if err := json.Unmarshal(m.Payload(), &v); err != nil {
      return err
    }

    rets <- "Hi, " + v.Name + ", " + v.Message
    return nil
  }))
  // shut down the service and notify all workers
  // wait until all jobs are complete
  defer q.Release()

  // assign tasks to the queue
  for i := 0; i < taskN; i++ {
    go func(i int) {
      if err := q.Queue(&job{
        Name:    "Gopher",
        Message: fmt.Sprintf("handle the job: %d", i+1),
      }); err != nil {
        log.Println(err)
      }
    }(i)
  }

  // wait until all tasks are done
  for i := 0; i < taskN; i++ {
    fmt.Println("message:", <-rets)
    time.Sleep(50 * time.Millisecond)
  }
}

Using NSQ as a Queue

Refer to the NSQ documentation for more details.

package main

import (
  "context"
  "encoding/json"
  "fmt"
  "log"
  "time"

  "github.com/golang-queue/nsq"
  "github.com/golang-queue/queue"
  "github.com/golang-queue/queue/core"
)

type job struct {
  Message string
}

func (j *job) Bytes() []byte {
  b, err := json.Marshal(j)
  if err != nil {
    panic(err)
  }
  return b
}

func main() {
  taskN := 100
  rets := make(chan string, taskN)

  // define the worker
  w := nsq.NewWorker(
    nsq.WithAddr("127.0.0.1:4150"),
    nsq.WithTopic("example"),
    nsq.WithChannel("foobar"),
    // concurrent job number
    nsq.WithMaxInFlight(10),
    nsq.WithRunFunc(func(ctx context.Context, m core.TaskMessage) error {
      var v job
      if err := json.Unmarshal(m.Payload(), &v); err != nil {
        return err
      }

      rets <- v.Message
      return nil
    }),
  )

  // define the queue
  q := queue.NewPool(
    5,
    queue.WithWorker(w),
  )

  // assign tasks to the queue
  for i := 0; i < taskN; i++ {
    go func(i int) {
      q.Queue(&job{
        Message: fmt.Sprintf("handle the job: %d", i+1),
      })
    }(i)
  }

  // wait until all tasks are done
  for i := 0; i < taskN; i++ {
    fmt.Println("message:", <-rets)
    time.Sleep(50 * time.Millisecond)
  }

  // shut down the service and notify all workers
  q.Release()
}

Using NATS as a Queue

Refer to the NATS documentation for more details.

package main

import (
  "context"
  "encoding/json"
  "fmt"
  "log"
  "time"

  "github.com/golang-queue/nats"
  "github.com/golang-queue/queue"
  "github.com/golang-queue/queue/core"
)

type job struct {
  Message string
}

func (j *job) Bytes() []byte {
  b, err := json.Marshal(j)
  if err != nil {
    panic(err)
  }
  return b
}

func main() {
  taskN := 100
  rets := make(chan string, taskN)

  // define the worker
  w := nats.NewWorker(
    nats.WithAddr("127.0.0.1:4222"),
    nats.WithSubj("example"),
    nats.WithQueue("foobar"),
    nats.WithRunFunc(func(ctx context.Context, m core.TaskMessage) error {
      var v job
      if err := json.Unmarshal(m.Payload(), &v); err != nil {
        return err
      }

      rets <- v.Message
      return nil
    }),
  )

  // define the queue
  q, err := queue.NewQueue(
    queue.WithWorkerCount(10),
    queue.WithWorker(w),
  )
  if err != nil {
    log.Fatal(err)
  }

  // start the workers
  q.Start()

  // assign tasks to the queue
  for i := 0; i < taskN; i++ {
    go func(i int) {
      q.Queue(&job{
        Message: fmt.Sprintf("handle the job: %d", i+1),
      })
    }(i)
  }

  // wait until all tasks are done
  for i := 0; i < taskN; i++ {
    fmt.Println("message:", <-rets)
    time.Sleep(50 * time.Millisecond)
  }

  // shut down the service and notify all workers
  q.Release()
}

Using Redis (Pub/Sub) as a Queue

Refer to the Redis documentation for more details.

package main

import (
  "context"
  "encoding/json"
  "fmt"
  "log"
  "time"

  "github.com/golang-queue/queue"
  "github.com/golang-queue/queue/core"
  "github.com/golang-queue/redisdb"
)

type job struct {
  Message string
}

func (j *job) Bytes() []byte {
  b, err := json.Marshal(j)
  if err != nil {
    panic(err)
  }
  return b
}

func main() {
  taskN := 100
  rets := make(chan string, taskN)

  // define the worker
  w := redisdb.NewWorker(
    redisdb.WithAddr("127.0.0.1:6379"),
    redisdb.WithChannel("foobar"),
    redisdb.WithRunFunc(func(ctx context.Context, m core.TaskMessage) error {
      var v job
      if err := json.Unmarshal(m.Payload(), &v); err != nil {
        return err
      }

      rets <- v.Message
      return nil
    }),
  )

  // define the queue
  q, err := queue.NewQueue(
    queue.WithWorkerCount(10),
    queue.WithWorker(w),
  )
  if err != nil {
    log.Fatal(err)
  }

  // start the workers
  q.Start()

  // assign tasks to the queue
  for i := 0; i < taskN; i++ {
    go func(i int) {
      q.Queue(&job{
        Message: fmt.Sprintf("handle the job: %d", i+1),
      })
    }(i)
  }

  // wait until all tasks are done
  for i := 0; i < taskN; i++ {
    fmt.Println("message:", <-rets)
    time.Sleep(50 * time.Millisecond)
  }

  // shut down the service and notify all workers
  q.Release()
}

Documentation

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	// ErrNoTaskInQueue is returned by Worker.Request() when the queue is currently empty.
	// This is a temporary condition - new tasks may be added later.
	// The queue scheduler uses this error to determine when to retry or wait for notifications.
	ErrNoTaskInQueue = errors.New("golang-queue: no task in queue")

	// ErrQueueHasBeenClosed is returned by Worker.Request() during/after shutdown when no tasks remain.
	// This is a terminal state indicating the queue has been shut down and drained.
	// Once this error appears, no new tasks will be processed.
	// Triggered by: calling Shutdown() or Release() on the queue.
	ErrQueueHasBeenClosed = errors.New("golang-queue: queue has been closed")

	// ErrMaxCapacity is returned by Queue() or QueueTask() when the queue is at maximum capacity.
	// This only occurs when WithQueueSize() is used to set a capacity limit.
	// To handle: retry later, drop the task, or process it synchronously.
	// Triggered by: attempting to enqueue when count >= capacity.
	ErrMaxCapacity = errors.New("golang-queue: maximum size limit reached")
)
View Source
var ErrMissingWorker = errors.New("missing worker module")

ErrMissingWorker is returned when a queue is created without a worker implementation.

View Source
var ErrQueueShutdown = errors.New("queue has been closed and released")

ErrQueueShutdown is returned when an operation is attempted on a queue that has already been closed and released.

Functions

This section is empty.

Types

type Logger

type Logger interface {
	// Infof logs formatted informational messages.
	Infof(format string, args ...any)

	// Errorf logs formatted error messages.
	Errorf(format string, args ...any)

	// Fatalf logs formatted fatal errors with stack trace information.
	// Used for panics and critical failures.
	Fatalf(format string, args ...any)

	// Info logs informational messages.
	Info(args ...any)

	// Error logs error messages.
	Error(args ...any)

	// Fatal logs fatal errors with stack trace information.
	// Used for panics and critical failures.
	Fatal(args ...any)
}

Logger defines the interface for logging queue events, errors, and fatal conditions. The queue uses this interface to report:

  • Info: Normal operations (shutdown, retry attempts)
  • Error: Recoverable errors (task failures, runtime errors)
  • Fatal: Panics and critical failures (includes stack traces)

Implement this interface to integrate with custom logging systems (logrus, zap, etc.).

func NewEmptyLogger

func NewEmptyLogger() Logger

NewEmptyLogger creates a no-op logger that discards all log messages. This is useful for:

  • Performance-sensitive production environments where logging overhead matters
  • Testing scenarios where log output would clutter test results
  • Silent background workers that don't need observability

Example:

q := queue.NewPool(5, queue.WithLogger(queue.NewEmptyLogger()))
Example
l := NewEmptyLogger()
l.Info("test")
l.Infof("test")
l.Error("test")
l.Errorf("test")
l.Fatal("test")
l.Fatalf("test")

func NewLogger

func NewLogger() Logger

NewLogger creates a standard logger that writes to stderr with timestamps. This is the default logger used by queues unless overridden with WithLogger.

Log format:

  • INFO messages: Simple timestamped output
  • ERROR messages: Simple timestamped output
  • FATAL messages: Includes stack trace with file:line information

Use cases:

  • Development and debugging
  • Simple production deployments without structured logging
  • When detailed error context is needed

type Metric added in v0.0.10

type Metric interface {
	// IncBusyWorker increments the count of workers currently processing tasks.
	// Called atomically when a worker starts processing a job.
	IncBusyWorker()

	// DecBusyWorker decrements the count of workers currently processing tasks.
	// Called atomically when a worker finishes processing a job (success or failure).
	DecBusyWorker()

	// BusyWorkers returns the current number of workers actively processing tasks.
	// This value can range from 0 to the configured workerCount.
	BusyWorkers() int64

	// SuccessTasks returns the total number of tasks that completed successfully.
	// A task is considered successful if it returns no error and doesn't panic.
	SuccessTasks() uint64

	// FailureTasks returns the total number of tasks that failed.
	// A task is considered failed if it returns an error, panics, or times out.
	FailureTasks() uint64

	// SubmittedTasks returns the total number of tasks submitted to the queue.
	// This includes tasks still pending, in progress, and completed.
	SubmittedTasks() uint64

	// CompletedTasks returns the total number of tasks that have finished processing.
	// This equals SuccessTasks() + FailureTasks().
	CompletedTasks() uint64

	// IncSuccessTask increments the successful task counter.
	// Called atomically after a task completes without error.
	IncSuccessTask()

	// IncFailureTask increments the failed task counter.
	// Called atomically after a task fails, panics, or times out.
	IncFailureTask()

	// IncSubmittedTask increments the submitted task counter.
	// Called atomically when a new task is queued.
	IncSubmittedTask()
}

Metric defines the interface for tracking queue performance and worker statistics. All methods must be safe for concurrent access from multiple goroutines. Implement this interface to integrate with custom monitoring systems (Prometheus, StatsD, etc.).

func NewMetric added in v0.0.11

func NewMetric() Metric

NewMetric creates a new metric collector with all counters initialized to zero. The returned metric is safe for concurrent use.

type Option

type Option interface {
	// contains filtered or unexported methods
}

Option is a functional option for configuring a Queue. It follows the functional options pattern for flexible and extensible configuration.

func WithAfterFn added in v0.2.1

func WithAfterFn(afterFn func()) Option

WithAfterFn sets a callback function that will be executed after each job completes. This callback runs regardless of whether the job succeeded or failed. It executes after metrics are updated but before the worker picks up the next task. Useful for cleanup, logging, or triggering post-processing workflows.

Example:

q := NewPool(5, WithAfterFn(func() {
    log.Println("Job completed")
}))

func WithFn added in v0.0.7

func WithFn(fn func(context.Context, core.TaskMessage) error) Option

WithFn sets a custom handler function that will be called to process tasks. This function is used by the worker's Run method when processing job messages. The context allows cancellation and timeout control during task execution. If not set, defaults to a no-op function that returns nil.

Example:

handler := func(ctx context.Context, msg core.TaskMessage) error {
    // Process the message
    return processTask(msg)
}
q := NewPool(5, WithFn(handler))

func WithLogger

func WithLogger(l Logger) Option

WithLogger sets a custom logger for queue events and errors. By default, the queue uses a standard logger that writes to stderr. Use NewEmptyLogger() to disable logging entirely.

Example:

q := NewPool(5, WithLogger(myCustomLogger))
// or disable logging:
q := NewPool(5, WithLogger(NewEmptyLogger()))

func WithMetric added in v0.0.10

func WithMetric(m Metric) Option

WithMetric sets a custom metrics collector for tracking queue statistics. The default metric tracks busy workers, success/failure counts, and submitted tasks. Implement the Metric interface to integrate with custom monitoring systems.

Example:

q := NewPool(5, WithMetric(myPrometheusMetric))

func WithQueueSize added in v0.0.7

func WithQueueSize(num int) Option

WithQueueSize sets the maximum capacity of the queue. When set to 0 (default), the queue has unlimited capacity and will grow dynamically. When set to a positive value, Queue() will return ErrMaxCapacity when the limit is reached. Use this to prevent memory exhaustion under high load.

Example:

q := NewPool(5, WithQueueSize(1000)) // Queue will hold at most 1000 pending tasks

func WithRetryInterval added in v0.4.0

func WithRetryInterval(d time.Duration) Option

WithRetryInterval sets the interval at which the queue polls for new tasks when the queue is empty. This determines how often Request() is retried after receiving ErrNoTaskInQueue. Lower values provide faster response to new tasks but increase CPU usage. Defaults to 1 second.

Example:

q := NewPool(5, WithRetryInterval(100*time.Millisecond)) // Poll every 100ms

func WithWorker

func WithWorker(w core.Worker) Option

WithWorker sets a custom worker implementation for the queue backend. By default, NewPool uses an in-memory Ring buffer worker. Use this to integrate external queue systems like NSQ, NATS, Redis, or RabbitMQ. This option is required when using NewQueue() instead of NewPool().

Example:

q, _ := NewQueue(WithWorker(myNSQWorker), WithWorkerCount(10))

func WithWorkerCount

func WithWorkerCount(num int64) Option

WithWorkerCount sets the number of concurrent worker goroutines that will process jobs. If num is less than or equal to 0, it defaults to runtime.NumCPU(). More workers allow higher concurrency but consume more system resources.

Example:

q := NewPool(10, WithWorkerCount(4)) // Creates a pool with 4 workers

type OptionFunc added in v0.1.0

type OptionFunc func(*Options)

OptionFunc is a function adapter that implements the Option interface. It allows regular functions to be used as Options.

type Options added in v0.0.7

type Options struct {
	// contains filtered or unexported fields
}

Options holds the configuration parameters for a Queue. Use the With* functions to configure these options when creating a queue.

func NewOptions added in v0.0.7

func NewOptions(opts ...Option) *Options

NewOptions creates an Options struct with default values and applies any provided options. Default values:

  • workerCount: runtime.NumCPU()
  • queueSize: 0 (unlimited)
  • logger: stderr logger with timestamps
  • worker: nil (must be provided via WithWorker or use NewPool which sets Ring)
  • fn: no-op function returning nil
  • metric: built-in metric tracker
  • retryInterval: 1 second

type Queue

type Queue struct {
	sync.Mutex // Mutex to protect concurrent access to queue state
	// contains filtered or unexported fields
}

Queue represents a message queue with worker management, job scheduling, retry logic, and graceful shutdown capabilities.

func NewPool added in v0.0.7

func NewPool(size int64, opts ...Option) *Queue

NewPool creates a ready-to-use in-memory queue with the Ring buffer worker. This is the recommended way to create a queue for most use cases.

Key differences from NewQueue:

  • Automatically creates and attaches a Ring buffer worker (no need for WithWorker)
  • Calls Start() automatically so the queue begins processing immediately
  • Panics on error instead of returning an error (simplifies initialization)

Parameters:

  • size: Number of worker goroutines (if <= 0, defaults to runtime.NumCPU())
  • opts: Additional options to customize the queue (WithLogger, WithQueueSize, etc.)

Example:

// Create a pool with 5 workers and custom capacity
q := queue.NewPool(5, queue.WithQueueSize(100))
defer q.Release()

// Queue tasks
q.QueueTask(func(ctx context.Context) error {
    // Process task
    return nil
})

Use NewQueue instead if you need:

  • Custom worker implementations (NSQ, NATS, Redis, etc.)
  • Manual control over when to start the queue
  • Error handling during queue creation
Example (QueueTask)
package main

import (
	"context"
	"fmt"
	"log"
	"time"

	"github.com/golang-queue/queue"
)

func main() {
	taskN := 7
	rets := make(chan int, taskN)
	// allocate a pool with 5 goroutines to deal with those tasks
	p := queue.NewPool(5)
	// don't forget to release the pool in the end
	defer p.Release()

	// assign tasks to asynchronous goroutine pool
	for i := 0; i < taskN; i++ {
		idx := i
		if err := p.QueueTask(func(context.Context) error {
			// sleep and return the index
			time.Sleep(20 * time.Millisecond)
			rets <- idx
			return nil
		}); err != nil {
			log.Println(err)
		}
	}

	// wait until all tasks done
	for i := 0; i < taskN; i++ {
		fmt.Println("index:", <-rets)
	}

}
Output:

index: 3
index: 0
index: 2
index: 4
index: 5
index: 6
index: 1
Example (QueueTaskTimeout)
package main

import (
	"context"
	"fmt"
	"log"
	"time"

	"github.com/golang-queue/queue"
	"github.com/golang-queue/queue/job"
)

func main() {
	taskN := 7
	rets := make(chan int, taskN)
	resps := make(chan error, 1)
	// allocate a pool with 5 goroutines to deal with those tasks
	q := queue.NewPool(5)
	// don't forget to release the pool in the end
	defer q.Release()

	// assign tasks to asynchronous goroutine pool
	for i := 0; i < taskN; i++ {
		idx := i
		if err := q.QueueTask(func(ctx context.Context) error {
			// panic job
			if idx == 5 {
				panic("system error")
			}
			// timeout job
			if idx == 6 {
				time.Sleep(105 * time.Millisecond)
			}
			select {
			case <-ctx.Done():
				resps <- ctx.Err()
			default:
			}

			rets <- idx
			return nil
		}, job.AllowOption{
			Timeout: job.Time(100 * time.Millisecond),
		}); err != nil {
			log.Println(err)
		}
	}

	// wait until all tasks done
	for i := 0; i < taskN-1; i++ {
		fmt.Println("index:", <-rets)
	}
	close(resps)
	for e := range resps {
		fmt.Println(e.Error())
	}

	fmt.Println("success task count:", q.SuccessTasks())
	fmt.Println("failure task count:", q.FailureTasks())
	fmt.Println("submitted task count:", q.SubmittedTasks())

}
Output:

index: 3
index: 0
index: 2
index: 4
index: 6
index: 1
context deadline exceeded
success task count: 5
failure task count: 2
submitted task count: 7

func NewQueue

func NewQueue(opts ...Option) (*Queue, error)

NewQueue creates and returns a new Queue instance with the provided options. Returns an error if no worker is specified.

func (*Queue) BusyWorkers added in v0.1.0

func (q *Queue) BusyWorkers() int64

BusyWorkers returns the number of workers currently processing jobs.

func (*Queue) CompletedTasks added in v0.2.1

func (q *Queue) CompletedTasks() uint64

CompletedTasks returns the total number of completed tasks (success + failure).

func (*Queue) FailureTasks added in v0.1.0

func (q *Queue) FailureTasks() uint64

FailureTasks returns the number of failed tasks.

func (*Queue) Queue

func (q *Queue) Queue(message core.QueuedMessage, opts ...job.AllowOption) error

Queue enqueues a single job (core.QueuedMessage) into the queue. Accepts job options for customization.

func (*Queue) QueueTask

func (q *Queue) QueueTask(task job.TaskFunc, opts ...job.AllowOption) error

QueueTask enqueues a single task function into the queue. Accepts job options for customization.

func (*Queue) Release added in v0.0.7

func (q *Queue) Release()

Release performs a graceful shutdown and waits for all goroutines to finish.

func (*Queue) Shutdown

func (q *Queue) Shutdown()

Shutdown initiates a graceful shutdown of the queue. It signals all goroutines to stop, shuts down the worker, and closes the quit channel. Shutdown is idempotent and safe to call multiple times.

func (*Queue) Start

func (q *Queue) Start()

Start launches all worker goroutines and begins processing jobs. If workerCount is zero, Start is a no-op.

func (*Queue) SubmittedTasks added in v0.1.0

func (q *Queue) SubmittedTasks() uint64

SubmittedTasks returns the number of tasks submitted to the queue.

func (*Queue) SuccessTasks added in v0.1.0

func (q *Queue) SuccessTasks() uint64

SuccessTasks returns the number of successfully completed tasks.

func (*Queue) UpdateWorkerCount added in v0.1.0

func (q *Queue) UpdateWorkerCount(num int64)

UpdateWorkerCount dynamically updates the number of worker goroutines. Triggers scheduling to adjust to the new worker count.

func (*Queue) Wait

func (q *Queue) Wait()

Wait blocks until all goroutines in the routine group have finished.

type Ring added in v0.2.0

type Ring struct {
	sync.Mutex
	// contains filtered or unexported fields
}

Ring is an in-memory worker implementation using a dynamic circular buffer. It implements the core.Worker interface and provides automatic resizing:

  • Doubles capacity when full
  • Halves capacity when less than 25% utilized

The ring buffer uses two pointers (head and tail) to track the queue boundaries:

  • head: points to the next task to dequeue
  • tail: points to the next empty slot for enqueuing
  • When head == tail, the queue is empty
  • Both pointers wrap around using modulo arithmetic

func NewRing added in v0.2.0

func NewRing(opts ...Option) *Ring

NewRing creates a new Ring instance with the provided options. It initializes the task queue with a default size of 2, sets the capacity based on the provided options, and configures the logger and run function. The function returns a pointer to the newly created Ring instance.

Parameters:

opts - A variadic list of Option functions to configure the Ring instance.

Returns:

*Ring - A pointer to the newly created Ring instance.

func (*Ring) Queue added in v0.2.0

func (s *Ring) Queue(task core.TaskMessage) error

Queue adds a task to the ring buffer. The buffer grows dynamically (doubles in size) when full, unless capacity is set. Returns ErrQueueShutdown if the queue is closing, or ErrMaxCapacity if at the size limit.

Thread-safety: This method is safe for concurrent calls.

func (*Ring) Request added in v0.2.0

func (s *Ring) Request() (core.TaskMessage, error)

Request dequeues and returns the next task from the ring buffer. The buffer shrinks automatically (halves in size) when less than 25% full. Returns:

  • (task, nil) if a task is successfully dequeued
  • (nil, ErrNoTaskInQueue) if the queue is currently empty
  • (nil, ErrQueueHasBeenClosed) if shutdown is complete and the queue is empty

During shutdown, this method signals the exit channel when the last task is dequeued, allowing Shutdown() to complete.

Thread-safety: This method is safe for concurrent calls.

func (*Ring) Run added in v0.2.0

func (s *Ring) Run(ctx context.Context, task core.TaskMessage) error

Run executes a new task using the provided context and task message. It calls the runFunc function, which is responsible for processing the task. The context allows for cancellation and timeout control of the task execution.

func (*Ring) Shutdown added in v0.2.0

func (s *Ring) Shutdown() error

Shutdown gracefully shuts down the worker. It sets the stopFlag to indicate that the queue is shutting down and prevents new tasks from being added. If the queue is already shut down, it returns ErrQueueShutdown. It waits for all tasks to be processed before completing the shutdown.

Directories

Path Synopsis
Package mocks is a generated GoMock package.
Package mocks is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL