queue

package module
v0.0.0-...-239e12b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 23, 2023 License: MIT Imports: 11 Imported by: 0

README

Queue

CodeQL Run Tests codecov

Queue is a Golang library for spawning and managing a Goroutine pool, allowing you to create multiple workers according to the limited CPU number of machines.

Features

  • Support buffered channel queue.
  • Support NSQ (A realtime distributed messaging platform) as backend.
  • Support NATS (Connective Technology for Adaptive Edge & Distributed Systems) as backend.
  • Support Redis Pub/Sub as backend.
  • Support Redis Streams as backend.
  • Support RabbitMQ as backend.

Queue Scenario

Simple Queue service using Ring Buffer as default backend.

queue01

Change Queue service like NSQ, NATs or Redis.

queue02

Multiple Producer and Consumer.

queue03

Requirements

Go version 1.13 above

Installation

Install the stable version:

go get github.com/golang-queue/queue

Install the latest verison:

go get github.com/golang-queue/queue@master

Usage

Basic usage of Pool (use Task function)

By calling QueueTask() method, it schedules the task executed by worker (goroutines) in the Pool.

package main

import (
  "context"
  "fmt"
  "time"

  "github.com/golang-queue/queue"
)

func main() {
  taskN := 100
  rets := make(chan string, taskN)

  // initial queue pool
  q := queue.NewPool(5)
  // shutdown the service and notify all the worker
  // wait all jobs are complete.
  defer q.Release()

  // assign tasks in queue
  for i := 0; i < taskN; i++ {
    go func(i int) {
      if err := q.QueueTask(func(ctx context.Context) error {
        rets <- fmt.Sprintf("Hi Gopher, handle the job: %02d", +i)
        return nil
      }); err != nil {
        panic(err)
      }
    }(i)
  }

  // wait until all tasks done
  for i := 0; i < taskN; i++ {
    fmt.Println("message:", <-rets)
    time.Sleep(20 * time.Millisecond)
  }
}
Basic usage of Pool (use message queue)

Define the new message struct and implement the Bytes() func to encode message. Give the WithFn func to handle the message from Queue.

package main

import (
  "context"
  "encoding/json"
  "fmt"
  "log"
  "time"

  "github.com/golang-queue/queue"
  "github.com/golang-queue/queue/core"
)

type job struct {
  Name    string
  Message string
}

func (j *job) Bytes() []byte {
  b, err := json.Marshal(j)
  if err != nil {
    panic(err)
  }
  return b
}

func main() {
  taskN := 100
  rets := make(chan string, taskN)

  // initial queue pool
  q := queue.NewPool(5, queue.WithFn(func(ctx context.Context, m core.QueuedMessage) error {
    v, ok := m.(*job)
    if !ok {
      if err := json.Unmarshal(m.Bytes(), &v); err != nil {
        return err
      }
    }

    rets <- "Hi, " + v.Name + ", " + v.Message
    return nil
  }))
  // shutdown the service and notify all the worker
  // wait all jobs are complete.
  defer q.Release()

  // assign tasks in queue
  for i := 0; i < taskN; i++ {
    go func(i int) {
      if err := q.Queue(&job{
        Name:    "Gopher",
        Message: fmt.Sprintf("handle the job: %d", i+1),
      }); err != nil {
        log.Println(err)
      }
    }(i)
  }

  // wait until all tasks done
  for i := 0; i < taskN; i++ {
    fmt.Println("message:", <-rets)
    time.Sleep(50 * time.Millisecond)
  }
}

Using NSQ as Queue

See the NSQ documentation.

package main

import (
  "context"
  "encoding/json"
  "fmt"
  "log"
  "time"

  "github.com/golang-queue/nsq"
  "github.com/golang-queue/queue"
  "github.com/golang-queue/queue/core"
)

type job struct {
  Message string
}

func (j *job) Bytes() []byte {
  b, err := json.Marshal(j)
  if err != nil {
    panic(err)
  }
  return b
}

func main() {
  taskN := 100
  rets := make(chan string, taskN)

  // define the worker
  w := nsq.NewWorker(
    nsq.WithAddr("127.0.0.1:4150"),
    nsq.WithTopic("example"),
    nsq.WithChannel("foobar"),
    // concurrent job number
    nsq.WithMaxInFlight(10),
    nsq.WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
      v, ok := m.(*job)
      if !ok {
        if err := json.Unmarshal(m.Bytes(), &v); err != nil {
          return err
        }
      }

      rets <- v.Message
      return nil
    }),
  )

  // define the queue
  q := queue.NewPool(
    5,
    queue.WithWorker(w),
  )

  // assign tasks in queue
  for i := 0; i < taskN; i++ {
    go func(i int) {
      q.Queue(&job{
        Message: fmt.Sprintf("handle the job: %d", i+1),
      })
    }(i)
  }

  // wait until all tasks done
  for i := 0; i < taskN; i++ {
    fmt.Println("message:", <-rets)
    time.Sleep(50 * time.Millisecond)
  }

  // shutdown the service and notify all the worker
  q.Release()
}

Using NATs as Queue

See the NATs documentation

package main

import (
  "context"
  "encoding/json"
  "fmt"
  "log"
  "time"

  "github.com/golang-queue/nats"
  "github.com/golang-queue/queue"
  "github.com/golang-queue/queue/core"
)

type job struct {
  Message string
}

func (j *job) Bytes() []byte {
  b, err := json.Marshal(j)
  if err != nil {
    panic(err)
  }
  return b
}

func main() {
  taskN := 100
  rets := make(chan string, taskN)

  // define the worker
  w := nats.NewWorker(
    nats.WithAddr("127.0.0.1:4222"),
    nats.WithSubj("example"),
    nats.WithQueue("foobar"),
    nats.WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
      v, ok := m.(*job)
      if !ok {
        if err := json.Unmarshal(m.Bytes(), &v); err != nil {
          return err
        }
      }

      rets <- v.Message
      return nil
    }),
  )

  // define the queue
  q, err := queue.NewQueue(
    queue.WithWorkerCount(10),
    queue.WithWorker(w),
  )
  if err != nil {
    log.Fatal(err)
  }

  // start the five worker
  q.Start()

  // assign tasks in queue
  for i := 0; i < taskN; i++ {
    go func(i int) {
      q.Queue(&job{
        Message: fmt.Sprintf("handle the job: %d", i+1),
      })
    }(i)
  }

  // wait until all tasks done
  for i := 0; i < taskN; i++ {
    fmt.Println("message:", <-rets)
    time.Sleep(50 * time.Millisecond)
  }

  // shutdown the service and notify all the worker
  q.Release()
}

Using Redis(Pub/Sub) as Queue

See the redis documentation

package main

import (
  "context"
  "encoding/json"
  "fmt"
  "log"
  "time"

  "github.com/golang-queue/queue"
  "github.com/golang-queue/queue/core"
  "github.com/golang-queue/redisdb"
)

type job struct {
  Message string
}

func (j *job) Bytes() []byte {
  b, err := json.Marshal(j)
  if err != nil {
    panic(err)
  }
  return b
}

func main() {
  taskN := 100
  rets := make(chan string, taskN)

  // define the worker
  w := redisdb.NewWorker(
    redisdb.WithAddr("127.0.0.1:6379"),
    redisdb.WithChannel("foobar"),
    redisdb.WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
      v, ok := m.(*job)
      if !ok {
        if err := json.Unmarshal(m.Bytes(), &v); err != nil {
          return err
        }
      }

      rets <- v.Message
      return nil
    }),
  )

  // define the queue
  q, err := queue.NewQueue(
    queue.WithWorkerCount(10),
    queue.WithWorker(w),
  )
  if err != nil {
    log.Fatal(err)
  }

  // start the five worker
  q.Start()

  // assign tasks in queue
  for i := 0; i < taskN; i++ {
    go func(i int) {
      q.Queue(&job{
        Message: fmt.Sprintf("handle the job: %d", i+1),
      })
    }(i)
  }

  // wait until all tasks done
  for i := 0; i < taskN; i++ {
    fmt.Println("message:", <-rets)
    time.Sleep(50 * time.Millisecond)
  }

  // shutdown the service and notify all the worker
  q.Release()
}

Documentation

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	// ErrNoTaskInQueue there is nothing in the queue
	ErrNoTaskInQueue = errors.New("golang-queue: no task in queue")
	// ErrQueueHasBeenClosed the current queue is closed
	ErrQueueHasBeenClosed = errors.New("golang-queue: queue has been closed")
	// ErrMaxCapacity Maximum size limit reached
	ErrMaxCapacity = errors.New("golang-queue: maximum size limit reached")
)
View Source
var ErrMissingWorker = errors.New("missing worker module")

ErrMissingWorker missing define worker

View Source
var ErrQueueShutdown = errors.New("queue has been closed and released")

ErrQueueShutdown the queue is released and closed.

Functions

This section is empty.

Types

type Logger

type Logger interface {
	Infof(format string, args ...interface{})
	Errorf(format string, args ...interface{})
	Fatalf(format string, args ...interface{})
	Info(args ...interface{})
	Error(args ...interface{})
	Fatal(args ...interface{})
}

Logger interface is used throughout gorush

func NewEmptyLogger

func NewEmptyLogger() Logger

NewEmptyLogger for simple logger.

Example
l := NewEmptyLogger()
l.Info("test")
l.Infof("test")
l.Error("test")
l.Errorf("test")
l.Fatal("test")
l.Fatalf("test")

func NewLogger

func NewLogger() Logger

NewLogger for simple logger.

type Metric

type Metric interface {
	IncBusyWorker()
	DecBusyWorker()
	BusyWorkers() uint64
	SuccessTasks() uint64
	FailureTasks() uint64
	SubmittedTasks() uint64
	IncSuccessTask()
	IncFailureTask()
	IncSubmittedTask()
}

Metric interface

func NewMetric

func NewMetric() Metric

NewMetric for default metric structure

type Option

type Option interface {
	// contains filtered or unexported methods
}

An Option configures a mutex.

func WithFn

func WithFn(fn func(context.Context, core.QueuedMessage) error) Option

WithFn set custom job function

func WithLogger

func WithLogger(l Logger) Option

WithLogger set custom logger

func WithMetric

func WithMetric(m Metric) Option

WithMetric set custom Metric

func WithQueueSize

func WithQueueSize(num int) Option

WithQueueSize set worker count

func WithWorker

func WithWorker(w core.Worker) Option

WithWorker set custom worker

func WithWorkerCount

func WithWorkerCount(num int) Option

WithWorkerCount set worker count

type OptionFunc

type OptionFunc func(*Options)

OptionFunc is a function that configures a queue.

type Options

type Options struct {
	// contains filtered or unexported fields
}

Options for custom args in Queue

func NewOptions

func NewOptions(opts ...Option) *Options

NewOptions initialize the default value for the options

type Queue

type Queue struct {
	sync.Mutex
	// contains filtered or unexported fields
}

A Queue is a message queue.

func NewPool

func NewPool(size int, opts ...Option) *Queue

NewPool initializes a new pool

func NewQueue

func NewQueue(opts ...Option) (*Queue, error)

NewQueue returns a Queue.

func (*Queue) AddWorker

func (q *Queue) AddWorker(size int, opts ...Option) *Queue

AddWorker to enable all worker

func (*Queue) BusyWorkers

func (q *Queue) BusyWorkers() int

BusyWorkers returns the numbers of workers in the running process.

func (*Queue) FailureTasks

func (q *Queue) FailureTasks() int

BusyWorkers returns the numbers of failure tasks.

func (*Queue) Queue

func (q *Queue) Queue(ctx context.Context, message core.QueuedMessage, opts ...job.AllowOption) error

Queue to queue single job with binary

func (*Queue) QueueTask

func (q *Queue) QueueTask(task job.TaskFunc, opts ...job.AllowOption) error

QueueTask to queue single task

func (*Queue) Release

func (q *Queue) Release()

Release for graceful shutdown.

func (*Queue) Shutdown

func (q *Queue) Shutdown()

Shutdown stops all queues.

func (*Queue) Start

func (q *Queue) Start()

Start to enable all worker

func (*Queue) SubmittedTasks

func (q *Queue) SubmittedTasks() int

BusyWorkers returns the numbers of submitted tasks.

func (*Queue) SuccessTasks

func (q *Queue) SuccessTasks() int

BusyWorkers returns the numbers of success tasks.

func (*Queue) UpdateWorkerCount

func (q *Queue) UpdateWorkerCount(num int)

UpdateWorkerCount to update worker number dynamically.

func (*Queue) Wait

func (q *Queue) Wait()

Wait all process

type Ring

type Ring struct {
	sync.Mutex
	// contains filtered or unexported fields
}

Ring for simple queue using buffer channel

func NewRing

func NewRing(opts ...Option) *Ring

NewRing for create new Ring instance

func (*Ring) Queue

func (s *Ring) Queue(ctx context.Context, task core.QueuedMessage) error

Queue send task to the buffer channel

func (*Ring) QueueAsync

func (s *Ring) QueueAsync(ctx context.Context, task core.QueuedMessage) error

QueueAsync send task to the buffer channel

func (*Ring) Request

func (s *Ring) Request() (core.QueuedMessage, error)

Request a new task from channel

func (*Ring) Run

func (s *Ring) Run(ctx context.Context, task core.QueuedMessage) error

Run to execute new task

func (*Ring) Shutdown

func (s *Ring) Shutdown() error

Shutdown the worker

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL