taskq

package module
v1.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 15, 2022 License: MIT Imports: 6 Imported by: 1

README

TaskQ

Go Report GoDoc Build Status Codecov

Simple and powerful goroutine manager.


Installing

go get github.com/antonmashko/taskq

Purpose

Usually, you need TaskQ for managing your goroutines. This will allow you to control service resource consumption and graceful shutdown.

Example

package main

import (
	"context"
	"fmt"
	"log"

	"github.com/antonmashko/taskq"
)

type Task struct{}

func (Task) Do(ctx context.Context) error {
	fmt.Println("hello world")
	return nil
}

func main() {
	// Initializing new TaskQ instance with a limit of 10 max active goroutines
	// Use `limit=0` for not limiting goroutines number.
	tq := taskq.New(10)

	// Starting reading and executing tasks from queue
	err := tq.Start()
	if err != nil {
		log.Fatal(err)
	}

	// Enqueue new task for execution.
	// TaskQ will run Do method of Task when it will have an available worker (goroutine)
	_, err = tq.Enqueue(context.Background(), Task{})
	if err != nil {
		log.Fatal(err)
	}

	// Gracefully shutting down
	err = tq.Close()
	if err != nil {
		log.Fatal(err)
	}
}

Graceful shutdown

Shutdown and Close gracefully shuts down the TaskQ without interrupting any active tasks. If TaskQ need to finish all tasks in queue, use context ContextWithWait as Shutdown method argument.

Benchmark results

Benchmarks

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrClosed  = errors.New("taskq closed")
	ErrNilTask = errors.New("nil task")
)
View Source
var (
	EmptyQueue = errors.New("empty queue")
)
View Source
var ErrRetryTask = errors.New("error occurred during task execution. retry task")

Functions

func ContextWithWait added in v1.1.0

func ContextWithWait(ctx context.Context) context.Context

Types

type ConcurrentQueue added in v0.4.0

type ConcurrentQueue struct {
	// contains filtered or unexported fields
}

func NewConcurrentQueue added in v0.4.0

func NewConcurrentQueue() *ConcurrentQueue

func (*ConcurrentQueue) Dequeue added in v0.4.0

func (q *ConcurrentQueue) Dequeue(_ context.Context) (Task, error)

func (*ConcurrentQueue) Enqueue added in v0.4.0

func (q *ConcurrentQueue) Enqueue(_ context.Context, t Task) (int64, error)

func (*ConcurrentQueue) Len added in v1.0.2

func (q *ConcurrentQueue) Len(_ context.Context) int

type LimitedConcurrentQueue added in v0.4.0

type LimitedConcurrentQueue struct {
	// contains filtered or unexported fields
}

func NewLimitedConcurrentQueue added in v0.4.0

func NewLimitedConcurrentQueue(size int) *LimitedConcurrentQueue

func (*LimitedConcurrentQueue) Dequeue added in v0.4.0

func (q *LimitedConcurrentQueue) Dequeue(ctx context.Context) (Task, error)

func (*LimitedConcurrentQueue) Enqueue added in v0.4.0

func (q *LimitedConcurrentQueue) Enqueue(_ context.Context, t Task) (int64, error)

func (*LimitedConcurrentQueue) Len added in v1.0.2

type Queue added in v0.4.0

type Queue interface {
	Enqueue(context.Context, Task) (int64, error)
	// Dequeue Task from queue
	// if queue empty return `EmptyQueue` as error
	Dequeue(context.Context) (Task, error)
}

type RetryableTask

type RetryableTask struct {
	// contains filtered or unexported fields
}

func (*RetryableTask) Do

func (t *RetryableTask) Do(ctx context.Context) error

type Task

type Task interface {
	Do(ctx context.Context) error
}

Task for TaskQ

func NewRetryableTask

func NewRetryableTask(task Task, maxRetries int) Task

type TaskDone added in v0.4.2

type TaskDone interface {
	Done(context.Context)
}

type TaskFunc

type TaskFunc func(ctx context.Context) error

func (TaskFunc) Do

func (t TaskFunc) Do(ctx context.Context) error

type TaskOnError added in v0.4.2

type TaskOnError interface {
	OnError(context.Context, error)
}

type TaskQ

type TaskQ struct {
	OnDequeueError func(ctx context.Context, workerID uint64, err error)
	// contains filtered or unexported fields
}

func New

func New(limit int) *TaskQ

func NewWithQueue added in v0.4.0

func NewWithQueue(limit int, q Queue) *TaskQ

func Pool added in v1.1.0

func Pool(size int) *TaskQ

func PoolWithQueue added in v1.1.0

func PoolWithQueue(size int, q Queue) *TaskQ

func (*TaskQ) Close

func (t *TaskQ) Close() error

func (*TaskQ) Enqueue

func (t *TaskQ) Enqueue(ctx context.Context, task Task) (int64, error)

func (*TaskQ) Shutdown added in v0.3.0

func (t *TaskQ) Shutdown(ctx context.Context) error

func (*TaskQ) Start

func (t *TaskQ) Start() error

type WaitGroup added in v0.2.1

type WaitGroup struct {
	// contains filtered or unexported fields
}

func NewWaitGroup added in v0.2.1

func NewWaitGroup(size int) *WaitGroup

func (*WaitGroup) Enqueue added in v0.2.1

func (wg *WaitGroup) Enqueue(ctx context.Context, t Task) (int64, error)

func (*WaitGroup) Wait added in v0.2.1

func (wg *WaitGroup) Wait()

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL