bqueue

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 24, 2024 License: MIT Imports: 5 Imported by: 0

README

BQUEUE

Build Status
branch status
master CircleCI

A buffered async queue

Based on Marcio Castilho article “Handling 1 Million Requests per Minute with Go”

Why

We needed a simple and quick queue system to handle requests rapidly without impacting performance. Each of our jobs could take up to 20s to execute.

How it works

By using the awesomeness of channels.

Channels are the pipes that all goroutines to share data. You can send values into channels from one goroutine and receive those values in another goroutine.

With channels we are able to share the data between goroutines enabling it to be processed concurrently, to make the best use of multiple CPU cores.

Documentation

Overview

Package bqueue is a "in memory" queue.

Index

Constants

View Source
const (
	// DefaultLimit is the default job queue limit.
	DefaultLimit = 128

	// UnlimitedWorkers can passed to Workers when run in dynamic
	// mode to use an unlimited amount of workers.
	UnlimitedWorkers = -1
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Job

type Job interface {
	Process()
}

Job is implemented by types which can be processed by Queue.

type Logger

type Logger interface {
	Printf(format string, v ...interface{})
}

Logger is implemented by types which an be used by Queue as a log destination.

type Option

type Option func(q *Queue) error

Option represents a queue option.

func Limit

func Limit(count int) Option

Limit sets the maximum number of jobs queued before an error is returned. Default is DefaultLimit maximum queued jobs..

func Log

func Log(l Logger) Option

Log sets the logger for Queue. Default is log.Default().

func Static

func Static() Option

Static configures the Queue to use a static number of pre-spawned workers instead of dynamic workers, which can be beneficial for inexpensive jobs. Default behaviour is to use dynamic workers.

func Workers

func Workers(count int) Option

Workers sets the number of workers which will process jobs from the queue. The constant UnlimitedWorkers can be used along side dynamic workers, the default, which will enable unlimited workers. Default is runtime.NumCPU() workers.

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

Queue processes jobs.

func New

func New(options ...Option) (*Queue, error)

New creates a fully initialised Queue with the given options.

func (*Queue) Queue

func (q *Queue) Queue(job Job)

Queue queues a job in blocking mode.

func (*Queue) QueueNonBlocking

func (q *Queue) QueueNonBlocking(job Job) error

QueueNonBlocking queues a job in non blocking mode. If the maximum buffer as defined by Limit is already filled an error will be returned.

func (*Queue) Stop

func (q *Queue) Stop(ctx context.Context) error

Stop stops processing and returns once all jobs have been completed or the context indicates done. The queue should not be used after calling Stop, calling Queue or QueueNonBlocking after Stop will cause a panic.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL