goQueue

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 23, 2025 License: MIT Imports: 10 Imported by: 0

README

[!NOTE] This README was translated by ChatGPT 5-mini

Go Task Queue

Lightweight Golang priority queue that supports bounded concurrency, priority promotion, and graceful shutdown. Maximizes hardware utilization and prevents system overload.
Suitable for scenarios that need controlled concurrent task execution with priority scheduling.

pkg card codecov version license

Core Features

Bounded Concurrency

Configurable worker pool size (default: CPU cores × 2). Tasks beyond worker capacity queue up to avoid system overload while maximizing hardware utilization.

Priority Queue

A four-level priority system implemented with a min-heap (Immediate > High > Normal > Low). Higher-priority tasks execute first; tasks with the same priority are processed FIFO.

Priority Promotion

Tasks that wait for a long time are automatically promoted to a higher priority to prevent starvation. Promotion thresholds are calculated based on configured timeouts.

Flowchart

Main Flow
graph TB
  User[User]

  subgraph "Actions"
  New[New]
  Start[Start]
  Enqueue[Enqueue Task]
  Close[Shutdown]
  Options[WithTaskID<br>WithTimeout<br>WithCallback]
  end

  subgraph "Task Management"
  Pending[pending queue]
  TaskHeap[taskHeap<br>promotion]
  Task[task]
  end

  subgraph "Execution"
  Worker1[Worker 1]
  Worker2[Worker 2]
  WorkerN[Worker N...]
  Execute[execute task]
  Callback[callback]
  end

  subgraph "Priority System"
  Priority[priority]
  Immediate[Immediate]
  High[High]
  Normal[Normal]
  Low[Low]
  InsertAt[insertAt / ordering]
  end

  User --> New
  User --> Start
  User --> Enqueue
  User --> Close

  Callback --> |async| User

  Start -->|start| Execution

  Enqueue -->|options| Options
  Enqueue -->|create| Task
  Task -->|push| Pending
  
  Pending -->|manage| TaskHeap
  InsertAt --> Pending
  TaskHeap -->|sort| Priority

  Priority -.-> Immediate
  Priority -.-> High
  Priority -.-> Normal
  Priority -.-> Low

  Immediate -.-> InsertAt
  High -.-> InsertAt
  Normal -.-> InsertAt
  Low -.-> InsertAt

  Pending --> |pop| Worker1
  Pending --> |pop| Worker2
  Pending --> |pop| WorkerN

  Worker1 -->|execute| Execute
  Worker2 -->|execute| Execute
  WorkerN -->|execute| Execute

  Execute -->|trigger| Callback
Priority Promotion
stateDiagram
  [*] --> Low: create task
  [*] --> Normal: create task
  [*] --> High: create task
  [*] --> Immediate: create task
  
  Low --> Normal: wait >= promotion.After
  Normal --> High: wait >= promotion.After
  High --> [*]: executed
  Immediate --> [*]: executed
  
  note right of Low
  timeout
  range 30-120s
  end note
  
  note right of Normal
  timeout * 2
  range 30-120s
  end note

Usage

Installation

[!NOTE] The latest commit may change; using tagged versions is recommended.
Commits that only change documentation may be rebased later.

go get github.com/pardnchiu/go-queue@[VERSION]

git clone --depth 1 --branch [VERSION] https://github.com/pardnchiu/go-queue.git
Initialization
Basic Usage
package main

import (
  "context"
  "fmt"
  "time"
  
  queue "github.com/pardnchiu/go-queue"
)

func main() {
  // Initialize with default config
  q := queue.New(nil)
  
  // Start queue workers
  ctx := context.Background()
  q.Start(ctx)
  
  // Enqueue tasks
  for i := 0; i < 10; i++ {
  id, err := q.Enqueue(ctx, "", func(ctx context.Context) error {
    fmt.Println("task executed")
    return nil
  })
  if err != nil {
    fmt.Printf("enqueue failed: %v\n", err)
  }
  fmt.Printf("task ID: %s\n", id)
  }
  
  // Graceful shutdown (wait for all tasks to finish)
  if err := q.Shutdown(ctx); err != nil {
  fmt.Printf("shutdown error: %v\n", err)
  }
}
Using Preset Configs
package main

import (
  "context"
  "fmt"
  
  queue "github.com/pardnchiu/go-queue"
)

func main() {
  q := queue.New(&queue.Config{
  Workers: 8,
  Size:    1000,
  Timeout: 60,
  Preset: map[string]queue.PresetConfig{
    "critical": {Priority: "immediate", Timeout: 15},
    "email":    {Priority: "high", Timeout: 30},
    "report":   {Priority: "normal", Timeout: 120},
    "cleanup":  {Priority: "low", Timeout: 300},
  },
  })
  
  ctx := context.Background()
  q.Start(ctx)
  defer q.Shutdown(ctx)
  
  // Use preset config for critical payment
  q.Enqueue(ctx, "critical", func(ctx context.Context) error {
  return processPayment()
  })
  
  // Use preset config to send email
  q.Enqueue(ctx, "email", func(ctx context.Context) error {
  return sendNotification()
  })
  
  // Use preset config to generate report
  q.Enqueue(ctx, "report", func(ctx context.Context) error {
  return generateReport()
  })
}
Using Options
package main

import (
  "context"
  "fmt"
  "time"
  
  queue "github.com/pardnchiu/go-queue"
)

func main() {
  q := queue.New(&queue.Config{Workers: 4})
  
  ctx := context.Background()
  q.Start(ctx)
  defer q.Shutdown(ctx)
  
  // Custom task ID
  q.Enqueue(ctx, "", func(ctx context.Context) error {
  return processOrder("ORD-123")
  }, queue.WithTaskID("order-ORD-123"))
  
  // Custom timeout
  q.Enqueue(ctx, "", func(ctx context.Context) error {
  return heavyComputation()
  }, queue.WithTimeout(5*time.Minute))
  
  // Custom callback
  q.Enqueue(ctx, "", func(ctx context.Context) error {
  return sendEmail()
  }, queue.WithCallback(func(id string, err error) {
  if err != nil {
    fmt.Printf("task %s failed: %v\n", id, err)
  } else {
    fmt.Printf("task %s completed\n", id)
  }
  }))
  
  // Combined options
  q.Enqueue(ctx, "", func(ctx context.Context) error {
  return importData()
  },
  queue.WithTaskID("import-daily"),
  queue.WithTimeout(10*time.Minute),
  queue.WithCallback(func(id string, err error) {
    logResult(id, err)
  }),
  )
}

Configuration

type Config struct {
  Workers int                     // worker pool size (default: CPU cores × 2)
  Size    int                     // max queue capacity (default: Workers × 64)
  Timeout int                     // default timeout in seconds (default: 30)
  Preset  map[string]PresetConfig // named preset configs
}

type PresetConfig struct {
  Priority string // "immediate", "high", "normal", "low" (default: "normal")
  Timeout  int    // override timeout in seconds (0 = computed by priority)
}

Priority Levels

Priority Value Timeout Calculation Use Case
Immediate 0 timeout / 4 (15-120s) Payments, alerts
High 1 timeout / 2 (15-120s) User-initiated ops
Normal 2 timeout (15-120s) Background tasks
Low 3 timeout × 2 (15-120s) Cleanup, analytics

Available Functions

Queue Management
  • New(config) - create a new queue instance

    q := queue.New(&queue.Config{
    Workers: 4,
    Size:    256,
    Timeout: 60,
    })
    
  • Start(ctx) - start the worker pool

    q.Start(ctx)
    
  • Enqueue(ctx, preset, action, options...) - add a task to the queue

    id, err := q.Enqueue(ctx, "email", func(ctx context.Context) error {
    return sendEmail()
    })
    
  • Shutdown(ctx) - graceful shutdown

    // wait indefinitely for all tasks to finish
    err := q.Shutdown(context.Background())
    
    // with timeout
    ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
    defer cancel()
    err := q.Shutdown(ctx)
    
Enqueue Options
  • WithTaskID(id) - set a custom task ID

    queue.WithTaskID("unique-task-id")
    
  • WithTimeout(duration) - set task timeout

    queue.WithTimeout(5 * time.Minute)
    
  • WithCallback(fn) - set completion callback

    queue.WithCallback(func(id string, err error) {
    // called asynchronously after task completion
    })
    

Priority Promotion

Waiting tasks in the queue are automatically promoted to prevent starvation:

Original Priority Promotion Target Wait Threshold
Low Normal clamp(timeout, 30s, 120s)
Normal High clamp(timeout × 2, 30s, 120s)
High - no promotion (already high priority)
Immediate - no promotion (highest priority)

Promotion is checked each time a worker pops a task from the queue.

Timeout Mechanism

  • Each task has a timeout based on priority or explicit configuration.
  • Use WithTimeout to override the default timeout.
  • On timeout:
    • the task's context is canceled
    • an error is logged
    • the callback receives a timeout error (if configured)
    • goroutine leak detection: if the task doesn't respond to cancellation within 5 seconds, a warning is emitted

Use Cases

Use Case Configuration
API background jobs Workers: CPU×2, Timeout: 30s
Email/notifications Priority: High, Timeout: 60s
Report generation Priority: Normal, Timeout: 300s
Data cleanup Priority: Low, Timeout: 600s
Payment processing Priority: Immediate, Timeout: 15s

Works well integrated with go-scheduler:

// Scheduler triggers, Queue controls concurrency
scheduler.Add("@every 1m", func() error {
  orders := db.GetPendingOrders()
  for _, o := range orders {
  queue.Enqueue(ctx, "order", func(ctx context.Context) error {
    return processOrder(o)
  })
  }
  return nil
})

License

This project is licensed under MIT.

Author

邱敬幃 Pardn Chiu

Stars

Star


©️ 2025 邱敬幃 Pardn Chiu

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	Workers int                     // default = CPU * 2
	Size    int                     // default = Workers * 64
	Timeout int                     // default = 30
	Preset  map[string]PresetConfig // default = empty
}

type EnqueueOption

type EnqueueOption func(*enqueueConfig)

func WithCallback

func WithCallback(fn func(id string, err error)) EnqueueOption

func WithTaskID

func WithTaskID(id string) EnqueueOption

func WithTimeout

func WithTimeout(d time.Duration) EnqueueOption

type PresetConfig

type PresetConfig struct {
	Priority string // nil = 用 DefaultPriority
	Timeout  int    // 0 = 依 Priority 自動計算(秒)
}

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

func New

func New(config *Config) *Queue

func (*Queue) Enqueue

func (q *Queue) Enqueue(ctx context.Context, presetName string, action func(ctx context.Context) error, options ...EnqueueOption) (string, error)

func (*Queue) Shutdown

func (q *Queue) Shutdown(ctx context.Context) error

func (*Queue) Start

func (q *Queue) Start(ctx context.Context)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL