Documentation
¶
Index ¶
- Variables
- type Logger
- type Metric
- type Option
- func WithAfterFn(afterFn func()) Option
- func WithFn(fn func(context.Context, core.TaskMessage) error) Option
- func WithLogger(l Logger) Option
- func WithMetric(m Metric) Option
- func WithQueueSize(num int) Option
- func WithRetryInterval(d time.Duration) Option
- func WithWorker(w core.Worker) Option
- func WithWorkerCount(num int64) Option
- type OptionFunc
- type Options
- type Queue
- func (q *Queue) BusyWorkers() int64
- func (q *Queue) CompletedTasks() uint64
- func (q *Queue) FailureTasks() uint64
- func (q *Queue) Queue(message core.QueuedMessage, opts ...job.AllowOption) error
- func (q *Queue) QueueTask(task job.TaskFunc, opts ...job.AllowOption) error
- func (q *Queue) Release()
- func (q *Queue) Shutdown()
- func (q *Queue) Start()
- func (q *Queue) SubmittedTasks() uint64
- func (q *Queue) SuccessTasks() uint64
- func (q *Queue) UpdateWorkerCount(num int64)
- func (q *Queue) Wait()
- type Ring
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ( // ErrNoTaskInQueue there is nothing in the queue ErrNoTaskInQueue = errors.New("golang-queue: no task in queue") // ErrQueueHasBeenClosed the current queue is closed ErrQueueHasBeenClosed = errors.New("golang-queue: queue has been closed") // ErrMaxCapacity Maximum size limit reached ErrMaxCapacity = errors.New("golang-queue: maximum size limit reached") )
var ErrMissingWorker = errors.New("missing worker module")
ErrMissingWorker missing define worker
var ErrQueueShutdown = errors.New("queue has been closed and released")
ErrQueueShutdown the queue is released and closed.
Functions ¶
This section is empty.
Types ¶
type Logger ¶
type Logger interface { Infof(format string, args ...any) Errorf(format string, args ...any) Fatalf(format string, args ...any) Info(args ...any) Error(args ...any) Fatal(args ...any) }
Logger interface is used throughout gorush
func NewEmptyLogger ¶
func NewEmptyLogger() Logger
NewEmptyLogger for simple logger.
Example ¶
l := NewEmptyLogger() l.Info("test") l.Infof("test") l.Error("test") l.Errorf("test") l.Fatal("test") l.Fatalf("test")
type Metric ¶ added in v0.0.10
type Metric interface { IncBusyWorker() DecBusyWorker() BusyWorkers() int64 SuccessTasks() uint64 FailureTasks() uint64 SubmittedTasks() uint64 CompletedTasks() uint64 IncSuccessTask() IncFailureTask() IncSubmittedTask() }
Metric interface
type Option ¶
type Option interface {
// contains filtered or unexported methods
}
An Option configures a mutex.
func WithAfterFn ¶ added in v0.2.1
func WithAfterFn(afterFn func()) Option
WithAfterFn set callback function after job done
func WithQueueSize ¶ added in v0.0.7
WithQueueSize set worker count
func WithRetryInterval ¶ added in v0.4.0
WithRetryInterval sets the retry interval
type OptionFunc ¶ added in v0.1.0
type OptionFunc func(*Options)
OptionFunc is a function that configures a queue.
type Options ¶ added in v0.0.7
type Options struct {
// contains filtered or unexported fields
}
Options for custom args in Queue
func NewOptions ¶ added in v0.0.7
NewOptions initialize the default value for the options
type Queue ¶
A Queue is a message queue.
func NewPool ¶ added in v0.0.7
NewPool initializes a new pool
Example (QueueTask) ¶
package main import ( "context" "fmt" "log" "time" "github.com/golang-queue/queue" ) func main() { taskN := 7 rets := make(chan int, taskN) // allocate a pool with 5 goroutines to deal with those tasks p := queue.NewPool(5) // don't forget to release the pool in the end defer p.Release() // assign tasks to asynchronous goroutine pool for i := 0; i < taskN; i++ { idx := i if err := p.QueueTask(func(context.Context) error { // sleep and return the index time.Sleep(20 * time.Millisecond) rets <- idx return nil }); err != nil { log.Println(err) } } // wait until all tasks done for i := 0; i < taskN; i++ { fmt.Println("index:", <-rets) } }
Output: index: 3 index: 0 index: 2 index: 4 index: 5 index: 6 index: 1
Example (QueueTaskTimeout) ¶
package main import ( "context" "fmt" "log" "time" "github.com/golang-queue/queue" "github.com/golang-queue/queue/job" ) func main() { taskN := 7 rets := make(chan int, taskN) resps := make(chan error, 1) // allocate a pool with 5 goroutines to deal with those tasks q := queue.NewPool(5) // don't forget to release the pool in the end defer q.Release() // assign tasks to asynchronous goroutine pool for i := 0; i < taskN; i++ { idx := i if err := q.QueueTask(func(ctx context.Context) error { // panic job if idx == 5 { panic("system error") } // timeout job if idx == 6 { time.Sleep(105 * time.Millisecond) } select { case <-ctx.Done(): resps <- ctx.Err() default: } rets <- idx return nil }, job.AllowOption{ Timeout: job.Time(100 * time.Millisecond), }); err != nil { log.Println(err) } } // wait until all tasks done for i := 0; i < taskN-1; i++ { fmt.Println("index:", <-rets) } close(resps) for e := range resps { fmt.Println(e.Error()) } fmt.Println("success task count:", q.SuccessTasks()) fmt.Println("failure task count:", q.FailureTasks()) fmt.Println("submitted task count:", q.SubmittedTasks()) }
Output: index: 3 index: 0 index: 2 index: 4 index: 6 index: 1 context deadline exceeded success task count: 5 failure task count: 2 submitted task count: 7
func (*Queue) BusyWorkers ¶ added in v0.1.0
BusyWorkers returns the numbers of workers in the running process.
func (*Queue) CompletedTasks ¶ added in v0.2.1
CompletedTasks returns the numbers of completed tasks.
func (*Queue) FailureTasks ¶ added in v0.1.0
BusyWorkers returns the numbers of failure tasks.
func (*Queue) Queue ¶
func (q *Queue) Queue(message core.QueuedMessage, opts ...job.AllowOption) error
Queue to queue single job with binary
func (*Queue) SubmittedTasks ¶ added in v0.1.0
BusyWorkers returns the numbers of submitted tasks.
func (*Queue) SuccessTasks ¶ added in v0.1.0
BusyWorkers returns the numbers of success tasks.
func (*Queue) UpdateWorkerCount ¶ added in v0.1.0
UpdateWorkerCount to update worker number dynamically.
type Ring ¶ added in v0.2.0
Ring represents a simple queue using a buffer channel.
func NewRing ¶ added in v0.2.0
NewRing creates a new Ring instance with the provided options. It initializes the task queue with a default size of 2, sets the capacity based on the provided options, and configures the logger and run function. The function returns a pointer to the newly created Ring instance.
Parameters:
opts - A variadic list of Option functions to configure the Ring instance.
Returns:
*Ring - A pointer to the newly created Ring instance.
func (*Ring) Queue ¶ added in v0.2.0
func (s *Ring) Queue(task core.TaskMessage) error
Queue adds a task to the ring buffer queue. It returns an error if the queue is shut down or has reached its maximum capacity.
func (*Ring) Request ¶ added in v0.2.0
func (s *Ring) Request() (core.TaskMessage, error)
Request retrieves the next task message from the ring queue. If the queue has been stopped and is empty, it signals the exit channel and returns an error indicating the queue has been closed. If the queue is empty but not stopped, it returns an error indicating there are no tasks in the queue. If a task is successfully retrieved, it is removed from the queue, and the queue may be resized if it is less than half full. Returns the task message and nil on success, or an error if the queue is empty or has been closed.
func (*Ring) Run ¶ added in v0.2.0
Run executes a new task using the provided context and task message. It calls the runFunc function, which is responsible for processing the task. The context allows for cancellation and timeout control of the task execution.
func (*Ring) Shutdown ¶ added in v0.2.0
Shutdown gracefully shuts down the worker. It sets the stopFlag to indicate that the queue is shutting down and prevents new tasks from being added. If the queue is already shut down, it returns ErrQueueShutdown. It waits for all tasks to be processed before completing the shutdown.