taskq

package module
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 15, 2022 License: MIT Imports: 6 Imported by: 1

README

TaskQ

Go Report GoDoc Build Status Codecov

Simple and powerful goroutines manager.


Installing

go get github.com/antonmashko/taskq

TaskQ when and how to use

Usually you need TaskQ for controlling resources of service (goroutines spawning). Example:

taskq := New(10)
taskq.Start()
taskID, err := taskq.Enqueue(context.Background(), taskq.TaskFunc(func(ctx context.Context) error {
        fmt.Println("hello world")
		return nil
	}))

In this example, we created TaskQ with limiting max active goroutines count to 10. Tasks for executing we're adding to TaskQ with Enqueue. Task will be executed when TaskQ will be able to create new worker (goroutine). Goroutine will dispose and next task will be executed in a new goroutine.

Use limit=0 for not limiting goroutines number.

Graceful shutdown

Shutdown and Close gracefully shuts down the TaskQ without interrupting any active tasks. If TaskQ need to finish all tasks in queue, use context ContextWithWait with Shutdown method.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrClosed  = errors.New("taskq closed")
	ErrNilTask = errors.New("nil task")
)
View Source
var ContextWithWait = context.WithValue(context.Background(), ctxWaitKey{}, true)
View Source
var (
	EmptyQueue = errors.New("empty queue")
)
View Source
var ErrRetryTask = errors.New("error occurred during task execution. retry task")

Functions

This section is empty.

Types

type ConcurrentQueue added in v0.4.0

type ConcurrentQueue struct {
	// contains filtered or unexported fields
}

func NewConcurrentQueue added in v0.4.0

func NewConcurrentQueue() *ConcurrentQueue

func (*ConcurrentQueue) Dequeue added in v0.4.0

func (q *ConcurrentQueue) Dequeue(_ context.Context) (Task, error)

func (*ConcurrentQueue) Enqueue added in v0.4.0

func (q *ConcurrentQueue) Enqueue(_ context.Context, t Task) (int64, error)

func (*ConcurrentQueue) Len added in v1.0.2

func (q *ConcurrentQueue) Len(_ context.Context) int

type LimitedConcurrentQueue added in v0.4.0

type LimitedConcurrentQueue struct {
	// contains filtered or unexported fields
}

func NewLimitedConcurrentQueue added in v0.4.0

func NewLimitedConcurrentQueue(size int) *LimitedConcurrentQueue

func (*LimitedConcurrentQueue) Dequeue added in v0.4.0

func (q *LimitedConcurrentQueue) Dequeue(ctx context.Context) (Task, error)

func (*LimitedConcurrentQueue) Enqueue added in v0.4.0

func (q *LimitedConcurrentQueue) Enqueue(_ context.Context, t Task) (int64, error)

func (*LimitedConcurrentQueue) Len added in v1.0.2

type Queue added in v0.4.0

type Queue interface {
	Enqueue(context.Context, Task) (int64, error)
	// Dequeue Task from queue
	// if queue empty return `EmptyQueue` as error
	Dequeue(context.Context) (Task, error)
}

type RetryableTask

type RetryableTask struct {
	// contains filtered or unexported fields
}

func (*RetryableTask) Do

func (t *RetryableTask) Do(ctx context.Context) error

type Task

type Task interface {
	Do(ctx context.Context) error
}

Task for TaskQ

func NewRetryableTask

func NewRetryableTask(task Task, maxRetries int) Task

type TaskDone added in v0.4.2

type TaskDone interface {
	Done(context.Context)
}

type TaskFunc

type TaskFunc func(ctx context.Context) error

func (TaskFunc) Do

func (t TaskFunc) Do(ctx context.Context) error

type TaskOnError added in v0.4.2

type TaskOnError interface {
	OnError(context.Context, error)
}

type TaskQ

type TaskQ struct {
	OnDequeueError func(ctx context.Context, workerID uint64, err error)
	// contains filtered or unexported fields
}

func New

func New(limit int) *TaskQ

func NewWithQueue added in v0.4.0

func NewWithQueue(limit int, q Queue) *TaskQ

func Pool added in v1.1.0

func Pool(size int) *TaskQ

func PoolWithQueue added in v1.1.0

func PoolWithQueue(size int, q Queue) *TaskQ

func (*TaskQ) Close

func (t *TaskQ) Close() error

func (*TaskQ) Enqueue

func (t *TaskQ) Enqueue(ctx context.Context, task Task) (int64, error)

func (*TaskQ) Shutdown added in v0.3.0

func (t *TaskQ) Shutdown(ctx context.Context) error

func (*TaskQ) Start

func (t *TaskQ) Start() error

type WaitGroup added in v0.2.1

type WaitGroup struct {
	// contains filtered or unexported fields
}

func NewWaitGroup added in v0.2.1

func NewWaitGroup(size int) *WaitGroup

func (*WaitGroup) Enqueue added in v0.2.1

func (wg *WaitGroup) Enqueue(ctx context.Context, t Task) (int64, error)

func (*WaitGroup) Wait added in v0.2.1

func (wg *WaitGroup) Wait()

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL