taskqueue

package module
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 31, 2025 License: MIT Imports: 13 Imported by: 0

README

TaskQueue-Go

GoDoc Build Status Build Status Go Report Card

TaskQueue-Go is a high-performance, distributed task queue library for Go, designed to simplify background job processing. With support for multiple queue backends and job storage backends, along with a pluggable architecture, it provides a scalable and reliable system for decoupling task execution from your main application logic. The decoupled design enables independent scaling and optimization of the queuing system and job storage.


Features

  • Distributed Task Queues: Seamlessly enqueue and process tasks across distributed systems.
  • Customizable Queues: Configure worker concurrency, job timeouts, and task handlers for each queue.
  • Retries of Failed Jobs: Automatically retries failed jobs based on configurable retry policies.
  • Scheduled Jobs: Allows scheduling of jobs to be executed after a delay.
  • Backend Flexibility: Initial support for Redis as a queue backend, with room for additional implementations.
  • Atomic Dequeueing: Ensures tasks are processed reliably using Redis Lua scripts.
  • Pluggable Architecture: Easily extend with custom implementations for enqueuing and job storage. This decoupled architecture allows you to independently scale and optimize the queuing system and job storage based on your needs.
  • Dashboard: TaskQueue-Go includes a feature-rich dashboard for monitoring queues and managing jobs.

Installation

go get github.com/oshankkumar/taskqueue-go

Getting Started

1. Setting Up the Enqueuer
package main

import (
	"context"

	"github.com/oshankkumar/taskqueue-go"
	"github.com/redis/go-redis/v9"
	redisq "github.com/oshankkumar/taskqueue-go/redis"
)

const ns = "taskqueue"

func main() {
	rc := redis.NewClient(&redis.Options{Addr: ":6379"})

	// Initialize Redis-backed enqueuer
	enq := redisq.NewQueue(rc, redisq.WithNamespace(ns))

	job := taskqueue.NewJob()
	err := job.JSONMarshalPayload(map[string]string{
		"to":      "user@example.com",
		"subject": "Welcome!",
	})
	if err != nil {
		panic(err)
	}

	err = enq.Enqueue(context.Background(), job, &taskqueue.EnqueueOptions{
		QueueName: "email_jobs",
	})

	if err != nil {
		panic(err)
	}
}

2. Setting Up a Worker to Process Jobs
package main

import (
	"context"
	"fmt"
	"os"
	"os/signal"

	"github.com/oshankkumar/taskqueue-go"
	"github.com/redis/go-redis/v9"
	redisq "github.com/oshankkumar/taskqueue-go/redis"
)

const ns = "taskqueue"

func main() {
	rc := redis.NewClient(&redis.Options{Addr: ":6379"})

	worker := taskqueue.NewWorker(&taskqueue.WorkerOptions{
		Queue:          redisq.NewQueue(rc, redisq.WithNamespace(ns), redisq.WithCompletedJobTTL(time.Hour)),
		HeartBeater:    redisq.NewHeartBeater(rc, redisq.WithNamespace(ns)),
		MetricsBackend: redisq.NewMetricsBackend(rc, redisq.WithNamespace(ns)),
	})

	worker.RegisterHandler("email_jobs", taskqueue.HandlerFunc(func(ctx context.Context, job *taskqueue.Job) error {
		fmt.Printf("Processing job: %+v\n", job)
		return nil // Return an error if the job fails
	}), taskqueue.WithConcurrency(8))

	ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
	defer cancel()

	worker.Start(ctx)

	<-ctx.Done()
	worker.Stop()
}


Running TaskQueue Manager

Running Locally

To run the TaskQueue Manager locally:

  1. Clone the repository:
git clone https://github.com/oshankkumar/taskqueue-go.git
cd taskqueue-go
  1. Build Dashboard:
cd taskmanager/taskqueue-web
yarn install
yarn build
  1. Build and run the manager:
go build -o taskqueue-manager ./cmd/taskqueue-manager
./taskqueue-manager -listen=:8050 -namespace=taskqueue-go -redis-heartbeat-addr=redis:6379 -redis-queue-addr=redis:6379 -redis-metrics-backend-addr=redis:6379 --static-web-dir=./taskmanager/taskqueue-web/dist/spa

You can access the dashboard at http://localhost:8050 when running the TaskQueue Manager.

Running with Docker

To run the TaskQueue Manager using Docker:

docker run -p 8050:8050  oshank/taskqueue-manager:latest -listen=:8050 -namespace=taskqueue -redis-heartbeat-addr=redis:6379 -redis-metrics-backend-addr=redis:6379 -redis-queue-addr=redis:6379

You can access the dashboard at http://localhost:8050

Dashboard

TaskQueue-Go comes with a nice dashboard for managing and monitoring your queues and jobs. The dashboard provides:

  • Queue Monitoring: View real-time statistics for each queue, including the number of pending, dead jobs.
  • Job Management: retry/delete dead jobs, enqueue/submit new jobs directly from the dashboard.
  • Queue Management: Pause or Start a queue directly from dashboard.
  • Worker Status: Monitor the status of workers.
  • Metrics: Visualize key metrics.
Dashboard Screenshot Dashboard Screenshot Dashboard Screenshot Dashboard Screenshot

Advanced Usage

Custom Job Storage

You can implement your own job storage by conforming to the JobStore interface:

type JobStore interface {
  CreateOrUpdate(ctx context.Context, job *Job) error
  GetJob(ctx context.Context, jobID string) (*Job, error)
  DeleteJob(ctx context.Context, jobID string) error
  UpdateJobStatus(ctx context.Context, jobID string, status JobStatus) error
}
Redis Lua Script for Atomic Dequeueing

The library leverages a Lua script to ensure atomic dequeuing and visibility timeout management:


Roadmap

  • Support for additional queueing backends. (e.g., RabbitMQ, Kafka).
  • Support for additional job store backends. (e.g., Mysql, Postgres).
  • Metrics and monitoring integrations.

License

This project is licensed under the MIT License. See the LICENSE file for details.

Documentation

Index

Constants

View Source
const (
	MetricPendingQueueSize  = "pending_queue_size"
	MetricDeadQueueSize     = "dead_queue_size"
	MetricJobProcessedCount = "job_processed_count"
	MetricJobFailedCount    = "job_failed_count"
)
View Source
const DefaultNameSpace = "taskqueue"

Variables

View Source
var ErrInvalidJobStatus = errors.New("invalid job status")
View Source
var ErrJobNotFound = errors.New("job not found")

Functions

This section is empty.

Types

type AckOptions

type AckOptions struct {
	QueueName string
}

type Acker

type Acker interface {
	Ack(ctx context.Context, job *Job, opts *AckOptions) error
	Nack(ctx context.Context, job *Job, opts *NackOptions) error
}

type Counter

type Counter interface {
	IncrementCounter(ctx context.Context, m Metric, count int, ts time.Time) error
	QueryRangeCounterValues(ctx context.Context, m Metric, start, end time.Time) (MetricRangeValue, error)
}

type DequeueFunc

type DequeueFunc func(ctx context.Context, opts *DequeueOptions, count int) ([]*Job, error)

func (DequeueFunc) Dequeue

func (f DequeueFunc) Dequeue(ctx context.Context, opts *DequeueOptions, count int) ([]*Job, error)

type DequeueOptions

type DequeueOptions struct {
	QueueName  string
	JobTimeout time.Duration
}

type Dequeuer

type Dequeuer interface {
	Dequeue(ctx context.Context, opts *DequeueOptions, count int) ([]*Job, error)
}

type EnqueueOptions

type EnqueueOptions struct {
	QueueName string
	Delay     time.Duration
}

type Enqueuer

type Enqueuer interface {
	Enqueue(ctx context.Context, job *Job, opts *EnqueueOptions) error
}

type ErrSkipRetry added in v1.1.0

type ErrSkipRetry struct {
	Err        error
	SkipReason string
}

func (ErrSkipRetry) Error added in v1.1.0

func (e ErrSkipRetry) Error() string

func (ErrSkipRetry) Unwrap added in v1.1.0

func (e ErrSkipRetry) Unwrap() error

type Gauge

type Gauge interface {
	RecordGauge(ctx context.Context, m Metric, value float64, ts time.Time) error
	QueryRangeGaugeValues(ctx context.Context, m Metric, start, end time.Time) (MetricRangeValue, error)
	GaugeValue(ctx context.Context, m Metric) (MetricValue, error)
}

type Handler

type Handler interface {
	Handle(ctx context.Context, job *Job) error
}

type HandlerFunc

type HandlerFunc func(context.Context, *Job) error

func (HandlerFunc) Handle

func (h HandlerFunc) Handle(ctx context.Context, job *Job) error

type HeartBeater

type HeartBeater interface {
	SendHeartbeat(ctx context.Context, data HeartbeatData) error
	RemoveHeartbeat(ctx context.Context, workerID string) error
	LastHeartbeats(ctx context.Context) ([]HeartbeatData, error)
}

type HeartbeatData

type HeartbeatData struct {
	WorkerID    string
	StartedAt   time.Time
	HeartbeatAt time.Time
	Queues      []HeartbeatQueueData
	PID         int
	MemoryUsage float64
	CPUUsage    float64
}

type HeartbeatQueueData

type HeartbeatQueueData struct {
	Name        string
	Concurrency int
}

type Job

type Job struct {
	ID            string
	QueueName     string
	Payload       []byte
	CreatedAt     time.Time
	StartedAt     time.Time
	UpdatedAt     time.Time
	Attempts      int
	FailureReason string
	Status        JobStatus
	ProcessedBy   string
}

func NewJob

func NewJob() *Job

func (*Job) JSONMarshalPayload

func (j *Job) JSONMarshalPayload(v any) (err error)

func (*Job) JSONUnMarshalPayload

func (j *Job) JSONUnMarshalPayload(v any) error

type JobOption

type JobOption func(*JobOptions)

func WithBackoffFunc

func WithBackoffFunc(f func(attempts int) time.Duration) JobOption

func WithConcurrency

func WithConcurrency(concurrency int) JobOption

func WithIdleWaitTime

func WithIdleWaitTime(idleWaitTime time.Duration) JobOption

func WithMaxAttempts

func WithMaxAttempts(maxAttempts int) JobOption

func WithTimeout

func WithTimeout(timeout time.Duration) JobOption

type JobOptions

type JobOptions struct {
	Timeout      time.Duration
	MaxAttempts  int
	Concurrency  int
	BackoffFunc  func(attempts int) time.Duration
	IdleWaitTime time.Duration
}

type JobStatus

type JobStatus int8
const (
	JobStatusWaiting JobStatus = iota + 1
	JobStatusActive
	JobStatusCompleted
	JobStatusFailed
	JobStatusDead
)

func ParseJobStatus

func ParseJobStatus(text string) (JobStatus, error)

func (JobStatus) String

func (j JobStatus) String() string

type Logger

type Logger interface {
	Debug(format string, args ...interface{})
	Info(format string, args ...interface{})
	Error(format string, args ...interface{})
}

type Metric

type Metric struct {
	Name   string
	Labels map[string]string
}

type MetricRangeValue

type MetricRangeValue struct {
	Metric Metric
	Values []MetricValue
}

type MetricValue

type MetricValue struct {
	TimeStamp time.Time
	Value     float64
}

type Metrics

type Metrics interface {
	Gauge
	Counter
}

type NackOptions

type NackOptions struct {
	QueueName   string
	RetryAfter  time.Duration
	ShouldRetry bool
}

type Pagination

type Pagination struct {
	Page int
	Rows int
}

type Queue

type Queue interface {
	Enqueuer
	Dequeuer
	Acker
	QueueManager
}

type QueueDetails

type QueueDetails struct {
	NameSpace  string
	Name       string
	JobCount   int
	Status     QueueStatus
	Pagination Pagination
	Jobs       []*Job
}

type QueueError

type QueueError int
const (
	ErrUnknown QueueError = iota
	ErrQueueNotFound
	ErrQueueEmpty
)

func (QueueError) Error

func (err QueueError) Error() string

type QueueInfo

type QueueInfo struct {
	NameSpace string
	Name      string
	JobCount  int
	Status    QueueStatus
}

type QueueManager

type QueueManager interface {
	DeleteJobFromDeadQueue(ctx context.Context, queueName string, jobID string) error
	PausePendingQueue(ctx context.Context, queueName string) error
	ResumePendingQueue(ctx context.Context, queueName string) error
	ListPendingQueues(ctx context.Context) ([]*QueueInfo, error)
	ListDeadQueues(ctx context.Context) ([]*QueueInfo, error)
	PagePendingQueue(ctx context.Context, queueName string, p Pagination) (*QueueDetails, error)
	PageDeadQueue(ctx context.Context, queueName string, p Pagination) (*QueueDetails, error)
}

type QueueStatus

type QueueStatus int
const (
	QueueStatusUnknown QueueStatus = iota
	QueueStatusPaused
	QueueStatusRunning
)

func (QueueStatus) String

func (s QueueStatus) String() string

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

func NewWorker

func NewWorker(opts *WorkerOptions) *Worker

func (*Worker) RegisterHandler

func (w *Worker) RegisterHandler(queueName string, h Handler, opts ...JobOption)

func (*Worker) Start

func (w *Worker) Start(ctx context.Context)

func (*Worker) Stop

func (w *Worker) Stop()

type WorkerOptions

type WorkerOptions struct {
	ID             string
	Queue          Queue
	HeartBeater    HeartBeater
	MetricsBackend Metrics
	ErrorHandler   func(err error)
	Logger         Logger
}

Directories

Path Synopsis
cmd
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL