queue

package module
v0.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 13, 2025 License: MIT Imports: 14 Imported by: 50

README

Queue

CodeQL Run Tests codecov

繁體中文 | 简体中文

Queue is a Golang library that helps you create and manage a pool of Goroutines (lightweight threads). It allows you to efficiently run multiple tasks in parallel, utilizing the CPU capacity of your machine.

Features

Queue Scenario

A simple queue service using a ring buffer as the default backend.

queue01

Easily switch the queue service to use NSQ, NATS, or Redis.

queue02

Supports multiple producers and consumers.

queue03

Requirements

Go version 1.22 or above

Installation

To install the stable version:

go get github.com/golang-queue/queue

To install the latest version:

go get github.com/golang-queue/queue@master

Usage

Basic Usage of Pool (using the Task function)

By calling the QueueTask() method, you can schedule tasks to be executed by workers (Goroutines) in the pool.

package main

import (
  "context"
  "fmt"
  "time"

  "github.com/golang-queue/queue"
)

func main() {
  taskN := 100
  rets := make(chan string, taskN)

  // initialize the queue pool
  q := queue.NewPool(5)
  // shut down the service and notify all workers
  // wait until all jobs are complete
  defer q.Release()

  // assign tasks to the queue
  for i := 0; i < taskN; i++ {
    go func(i int) {
      if err := q.QueueTask(func(ctx context.Context) error {
        rets <- fmt.Sprintf("Hi Gopher, handle the job: %02d", +i)
        return nil
      }); err != nil {
        panic(err)
      }
    }(i)
  }

  // wait until all tasks are done
  for i := 0; i < taskN; i++ {
    fmt.Println("message:", <-rets)
    time.Sleep(20 * time.Millisecond)
  }
}
Basic Usage of Pool (using a message queue)

Define a new message struct and implement the Bytes() function to encode the message. Use the WithFn function to handle messages from the queue.

package main

import (
  "context"
  "encoding/json"
  "fmt"
  "log"
  "time"

  "github.com/golang-queue/queue"
  "github.com/golang-queue/queue/core"
)

type job struct {
  Name    string
  Message string
}

func (j *job) Bytes() []byte {
  b, err := json.Marshal(j)
  if err != nil {
    panic(err)
  }
  return b
}

func main() {
  taskN := 100
  rets := make(chan string, taskN)

  // initialize the queue pool
  q := queue.NewPool(5, queue.WithFn(func(ctx context.Context, m core.TaskMessage) error {
    var v job
    if err := json.Unmarshal(m.Payload(), &v); err != nil {
      return err
    }

    rets <- "Hi, " + v.Name + ", " + v.Message
    return nil
  }))
  // shut down the service and notify all workers
  // wait until all jobs are complete
  defer q.Release()

  // assign tasks to the queue
  for i := 0; i < taskN; i++ {
    go func(i int) {
      if err := q.Queue(&job{
        Name:    "Gopher",
        Message: fmt.Sprintf("handle the job: %d", i+1),
      }); err != nil {
        log.Println(err)
      }
    }(i)
  }

  // wait until all tasks are done
  for i := 0; i < taskN; i++ {
    fmt.Println("message:", <-rets)
    time.Sleep(50 * time.Millisecond)
  }
}

Using NSQ as a Queue

Refer to the NSQ documentation for more details.

package main

import (
  "context"
  "encoding/json"
  "fmt"
  "log"
  "time"

  "github.com/golang-queue/nsq"
  "github.com/golang-queue/queue"
  "github.com/golang-queue/queue/core"
)

type job struct {
  Message string
}

func (j *job) Bytes() []byte {
  b, err := json.Marshal(j)
  if err != nil {
    panic(err)
  }
  return b
}

func main() {
  taskN := 100
  rets := make(chan string, taskN)

  // define the worker
  w := nsq.NewWorker(
    nsq.WithAddr("127.0.0.1:4150"),
    nsq.WithTopic("example"),
    nsq.WithChannel("foobar"),
    // concurrent job number
    nsq.WithMaxInFlight(10),
    nsq.WithRunFunc(func(ctx context.Context, m core.TaskMessage) error {
      var v job
      if err := json.Unmarshal(m.Payload(), &v); err != nil {
        return err
      }

      rets <- v.Message
      return nil
    }),
  )

  // define the queue
  q := queue.NewPool(
    5,
    queue.WithWorker(w),
  )

  // assign tasks to the queue
  for i := 0; i < taskN; i++ {
    go func(i int) {
      q.Queue(&job{
        Message: fmt.Sprintf("handle the job: %d", i+1),
      })
    }(i)
  }

  // wait until all tasks are done
  for i := 0; i < taskN; i++ {
    fmt.Println("message:", <-rets)
    time.Sleep(50 * time.Millisecond)
  }

  // shut down the service and notify all workers
  q.Release()
}

Using NATS as a Queue

Refer to the NATS documentation for more details.

package main

import (
  "context"
  "encoding/json"
  "fmt"
  "log"
  "time"

  "github.com/golang-queue/nats"
  "github.com/golang-queue/queue"
  "github.com/golang-queue/queue/core"
)

type job struct {
  Message string
}

func (j *job) Bytes() []byte {
  b, err := json.Marshal(j)
  if err != nil {
    panic(err)
  }
  return b
}

func main() {
  taskN := 100
  rets := make(chan string, taskN)

  // define the worker
  w := nats.NewWorker(
    nats.WithAddr("127.0.0.1:4222"),
    nats.WithSubj("example"),
    nats.WithQueue("foobar"),
    nats.WithRunFunc(func(ctx context.Context, m core.TaskMessage) error {
      var v job
      if err := json.Unmarshal(m.Payload(), &v); err != nil {
        return err
      }

      rets <- v.Message
      return nil
    }),
  )

  // define the queue
  q, err := queue.NewQueue(
    queue.WithWorkerCount(10),
    queue.WithWorker(w),
  )
  if err != nil {
    log.Fatal(err)
  }

  // start the workers
  q.Start()

  // assign tasks to the queue
  for i := 0; i < taskN; i++ {
    go func(i int) {
      q.Queue(&job{
        Message: fmt.Sprintf("handle the job: %d", i+1),
      })
    }(i)
  }

  // wait until all tasks are done
  for i := 0; i < taskN; i++ {
    fmt.Println("message:", <-rets)
    time.Sleep(50 * time.Millisecond)
  }

  // shut down the service and notify all workers
  q.Release()
}

Using Redis (Pub/Sub) as a Queue

Refer to the Redis documentation for more details.

package main

import (
  "context"
  "encoding/json"
  "fmt"
  "log"
  "time"

  "github.com/golang-queue/queue"
  "github.com/golang-queue/queue/core"
  "github.com/golang-queue/redisdb"
)

type job struct {
  Message string
}

func (j *job) Bytes() []byte {
  b, err := json.Marshal(j)
  if err != nil {
    panic(err)
  }
  return b
}

func main() {
  taskN := 100
  rets := make(chan string, taskN)

  // define the worker
  w := redisdb.NewWorker(
    redisdb.WithAddr("127.0.0.1:6379"),
    redisdb.WithChannel("foobar"),
    redisdb.WithRunFunc(func(ctx context.Context, m core.TaskMessage) error {
      var v job
      if err := json.Unmarshal(m.Payload(), &v); err != nil {
        return err
      }

      rets <- v.Message
      return nil
    }),
  )

  // define the queue
  q, err := queue.NewQueue(
    queue.WithWorkerCount(10),
    queue.WithWorker(w),
  )
  if err != nil {
    log.Fatal(err)
  }

  // start the workers
  q.Start()

  // assign tasks to the queue
  for i := 0; i < taskN; i++ {
    go func(i int) {
      q.Queue(&job{
        Message: fmt.Sprintf("handle the job: %d", i+1),
      })
    }(i)
  }

  // wait until all tasks are done
  for i := 0; i < taskN; i++ {
    fmt.Println("message:", <-rets)
    time.Sleep(50 * time.Millisecond)
  }

  // shut down the service and notify all workers
  q.Release()
}

Documentation

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	// ErrNoTaskInQueue there is nothing in the queue
	ErrNoTaskInQueue = errors.New("golang-queue: no task in queue")
	// ErrQueueHasBeenClosed the current queue is closed
	ErrQueueHasBeenClosed = errors.New("golang-queue: queue has been closed")
	// ErrMaxCapacity Maximum size limit reached
	ErrMaxCapacity = errors.New("golang-queue: maximum size limit reached")
)
View Source
var ErrMissingWorker = errors.New("missing worker module")

ErrMissingWorker missing define worker

View Source
var ErrQueueShutdown = errors.New("queue has been closed and released")

ErrQueueShutdown the queue is released and closed.

Functions

This section is empty.

Types

type Logger

type Logger interface {
	Infof(format string, args ...any)
	Errorf(format string, args ...any)
	Fatalf(format string, args ...any)
	Info(args ...any)
	Error(args ...any)
	Fatal(args ...any)
}

Logger interface is used throughout gorush

func NewEmptyLogger

func NewEmptyLogger() Logger

NewEmptyLogger for simple logger.

Example
l := NewEmptyLogger()
l.Info("test")
l.Infof("test")
l.Error("test")
l.Errorf("test")
l.Fatal("test")
l.Fatalf("test")

func NewLogger

func NewLogger() Logger

NewLogger for simple logger.

type Metric added in v0.0.10

type Metric interface {
	IncBusyWorker()
	DecBusyWorker()
	BusyWorkers() int64
	SuccessTasks() uint64
	FailureTasks() uint64
	SubmittedTasks() uint64
	CompletedTasks() uint64
	IncSuccessTask()
	IncFailureTask()
	IncSubmittedTask()
}

Metric interface

func NewMetric added in v0.0.11

func NewMetric() Metric

NewMetric for default metric structure

type Option

type Option interface {
	// contains filtered or unexported methods
}

An Option configures a mutex.

func WithAfterFn added in v0.2.1

func WithAfterFn(afterFn func()) Option

WithAfterFn set callback function after job done

func WithFn added in v0.0.7

func WithFn(fn func(context.Context, core.TaskMessage) error) Option

WithFn set custom job function

func WithLogger

func WithLogger(l Logger) Option

WithLogger set custom logger

func WithMetric added in v0.0.10

func WithMetric(m Metric) Option

WithMetric set custom Metric

func WithQueueSize added in v0.0.7

func WithQueueSize(num int) Option

WithQueueSize set worker count

func WithRetryInterval added in v0.4.0

func WithRetryInterval(d time.Duration) Option

WithRetryInterval sets the retry interval

func WithWorker

func WithWorker(w core.Worker) Option

WithWorker set custom worker

func WithWorkerCount

func WithWorkerCount(num int64) Option

WithWorkerCount set worker count

type OptionFunc added in v0.1.0

type OptionFunc func(*Options)

OptionFunc is a function that configures a queue.

type Options added in v0.0.7

type Options struct {
	// contains filtered or unexported fields
}

Options for custom args in Queue

func NewOptions added in v0.0.7

func NewOptions(opts ...Option) *Options

NewOptions initialize the default value for the options

type Queue

type Queue struct {
	sync.Mutex
	// contains filtered or unexported fields
}

A Queue is a message queue.

func NewPool added in v0.0.7

func NewPool(size int64, opts ...Option) *Queue

NewPool initializes a new pool

Example (QueueTask)
package main

import (
	"context"
	"fmt"
	"log"
	"time"

	"github.com/golang-queue/queue"
)

func main() {
	taskN := 7
	rets := make(chan int, taskN)
	// allocate a pool with 5 goroutines to deal with those tasks
	p := queue.NewPool(5)
	// don't forget to release the pool in the end
	defer p.Release()

	// assign tasks to asynchronous goroutine pool
	for i := 0; i < taskN; i++ {
		idx := i
		if err := p.QueueTask(func(context.Context) error {
			// sleep and return the index
			time.Sleep(20 * time.Millisecond)
			rets <- idx
			return nil
		}); err != nil {
			log.Println(err)
		}
	}

	// wait until all tasks done
	for i := 0; i < taskN; i++ {
		fmt.Println("index:", <-rets)
	}

}
Output:

index: 3
index: 0
index: 2
index: 4
index: 5
index: 6
index: 1
Example (QueueTaskTimeout)
package main

import (
	"context"
	"fmt"
	"log"
	"time"

	"github.com/golang-queue/queue"
	"github.com/golang-queue/queue/job"
)

func main() {
	taskN := 7
	rets := make(chan int, taskN)
	resps := make(chan error, 1)
	// allocate a pool with 5 goroutines to deal with those tasks
	q := queue.NewPool(5)
	// don't forget to release the pool in the end
	defer q.Release()

	// assign tasks to asynchronous goroutine pool
	for i := 0; i < taskN; i++ {
		idx := i
		if err := q.QueueTask(func(ctx context.Context) error {
			// panic job
			if idx == 5 {
				panic("system error")
			}
			// timeout job
			if idx == 6 {
				time.Sleep(105 * time.Millisecond)
			}
			select {
			case <-ctx.Done():
				resps <- ctx.Err()
			default:
			}

			rets <- idx
			return nil
		}, job.AllowOption{
			Timeout: job.Time(100 * time.Millisecond),
		}); err != nil {
			log.Println(err)
		}
	}

	// wait until all tasks done
	for i := 0; i < taskN-1; i++ {
		fmt.Println("index:", <-rets)
	}
	close(resps)
	for e := range resps {
		fmt.Println(e.Error())
	}

	fmt.Println("success task count:", q.SuccessTasks())
	fmt.Println("failure task count:", q.FailureTasks())
	fmt.Println("submitted task count:", q.SubmittedTasks())

}
Output:

index: 3
index: 0
index: 2
index: 4
index: 6
index: 1
context deadline exceeded
success task count: 5
failure task count: 2
submitted task count: 7

func NewQueue

func NewQueue(opts ...Option) (*Queue, error)

NewQueue returns a Queue.

func (*Queue) BusyWorkers added in v0.1.0

func (q *Queue) BusyWorkers() int64

BusyWorkers returns the numbers of workers in the running process.

func (*Queue) CompletedTasks added in v0.2.1

func (q *Queue) CompletedTasks() uint64

CompletedTasks returns the numbers of completed tasks.

func (*Queue) FailureTasks added in v0.1.0

func (q *Queue) FailureTasks() uint64

BusyWorkers returns the numbers of failure tasks.

func (*Queue) Queue

func (q *Queue) Queue(message core.QueuedMessage, opts ...job.AllowOption) error

Queue to queue single job with binary

func (*Queue) QueueTask

func (q *Queue) QueueTask(task job.TaskFunc, opts ...job.AllowOption) error

QueueTask to queue single task

func (*Queue) Release added in v0.0.7

func (q *Queue) Release()

Release for graceful shutdown.

func (*Queue) Shutdown

func (q *Queue) Shutdown()

Shutdown stops all queues.

func (*Queue) Start

func (q *Queue) Start()

Start to enable all worker

func (*Queue) SubmittedTasks added in v0.1.0

func (q *Queue) SubmittedTasks() uint64

BusyWorkers returns the numbers of submitted tasks.

func (*Queue) SuccessTasks added in v0.1.0

func (q *Queue) SuccessTasks() uint64

BusyWorkers returns the numbers of success tasks.

func (*Queue) UpdateWorkerCount added in v0.1.0

func (q *Queue) UpdateWorkerCount(num int64)

UpdateWorkerCount to update worker number dynamically.

func (*Queue) Wait

func (q *Queue) Wait()

Wait all process

type Ring added in v0.2.0

type Ring struct {
	sync.Mutex
	// contains filtered or unexported fields
}

Ring represents a simple queue using a buffer channel.

func NewRing added in v0.2.0

func NewRing(opts ...Option) *Ring

NewRing creates a new Ring instance with the provided options. It initializes the task queue with a default size of 2, sets the capacity based on the provided options, and configures the logger and run function. The function returns a pointer to the newly created Ring instance.

Parameters:

opts - A variadic list of Option functions to configure the Ring instance.

Returns:

*Ring - A pointer to the newly created Ring instance.

func (*Ring) Queue added in v0.2.0

func (s *Ring) Queue(task core.TaskMessage) error

Queue adds a task to the ring buffer queue. It returns an error if the queue is shut down or has reached its maximum capacity.

func (*Ring) Request added in v0.2.0

func (s *Ring) Request() (core.TaskMessage, error)

Request retrieves the next task message from the ring queue. If the queue has been stopped and is empty, it signals the exit channel and returns an error indicating the queue has been closed. If the queue is empty but not stopped, it returns an error indicating there are no tasks in the queue. If a task is successfully retrieved, it is removed from the queue, and the queue may be resized if it is less than half full. Returns the task message and nil on success, or an error if the queue is empty or has been closed.

func (*Ring) Run added in v0.2.0

func (s *Ring) Run(ctx context.Context, task core.TaskMessage) error

Run executes a new task using the provided context and task message. It calls the runFunc function, which is responsible for processing the task. The context allows for cancellation and timeout control of the task execution.

func (*Ring) Shutdown added in v0.2.0

func (s *Ring) Shutdown() error

Shutdown gracefully shuts down the worker. It sets the stopFlag to indicate that the queue is shutting down and prevents new tasks from being added. If the queue is already shut down, it returns ErrQueueShutdown. It waits for all tasks to be processed before completing the shutdown.

Directories

Path Synopsis
Package mocks is a generated GoMock package.
Package mocks is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL