queue

package module
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 4, 2025 License: MIT Imports: 14 Imported by: 1

README

Go Queue - Thư viện hàng đợi mạnh mẽ cho Go

Go Version Go Report Card GoDoc

Go Queue là một thư viện hàng đợi tác vụ (task queue) hiệu suất cao và linh hoạt được thiết kế cho các ứng dụng Go. Thư viện cung cấp một giải pháp toàn diện cho việc xử lý tác vụ bất đồng bộ với khả năng mở rộng và tích hợp dễ dàng.

✨ Tính năng chính

🚀 Hiệu suất cao
  • Xử lý đồng thời: Hỗ trợ đa worker để xử lý nhiều tác vụ song song
  • Tối ưu hóa bộ nhớ: Sử dụng memory pool và connection pooling hiệu quả
  • Polling thông minh: Khoảng thời gian polling có thể cấu hình
🔧 Linh hoạt và có thể mở rộng
  • Multi-adapter: Hỗ trợ cả Memory và Redis adapters
  • Dependency Injection: Tích hợp hoàn toàn với hệ thống DI
  • Provider Pattern: Dễ dàng mở rộng và tùy chỉnh
⚡ Xử lý tác vụ nâng cao
  • Scheduled Tasks: Lên lịch tác vụ để thực hiện vào thời điểm cụ thể
  • Retry Logic: Cơ chế retry thông minh với backoff
  • Priority Queues: Hỗ trợ ưu tiên hàng đợi
  • Dead Letter Queue: Xử lý tác vụ thất bại
  • Graceful Shutdown: Dừng server một cách an toàn
🛡️ Tin cậy và bảo mật
  • Error Handling: Xử lý lỗi toàn diện với logging chi tiết
  • Context Support: Hỗ trợ đầy đủ Go context cho cancellation
  • Monitoring: Metrics và health checks tích hợp
  • Configuration: Cấu hình linh hoạt qua YAML

🏗️ Kiến trúc

graph TB
    subgraph "Application Layer"
        CLIENT[Client<br/>🔄 Enqueue Tasks<br/>⏰ Schedule Tasks<br/>🎯 Context Support]
        SERVER[Server<br/>👥 Worker Pool<br/>🔧 Handler Registry<br/>⚡ Processing Engine]
    end
    
    subgraph "Core Management Layer"
        MANAGER[Manager<br/>🏭 Service Factory<br/>⚙️ Configuration<br/>🔌 Adapter Selection]
        PROVIDER[Service Provider<br/>🚀 Lifecycle Management<br/>📦 DI Integration<br/>📋 Scheduled Tasks]
    end
    
    subgraph "Storage Adapters"
        MEMORY[Memory Adapter<br/>💾 In-memory Storage<br/>⚡ Ultra Fast<br/>🔧 Development Mode]
        REDIS[Redis Adapter<br/>💽 Persistent Storage<br/>🌐 Distributed<br/>🏢 Production Ready]
    end
    
    subgraph "External Dependencies"
        SCHEDULER[Scheduler<br/>⏲️ Delayed Tasks<br/>📅 Cron Jobs<br/>🔄 Recurring Tasks]
        DI[DI Container<br/>🔗 Dependency Injection<br/>🎯 Service Resolution<br/>♻️ Lifecycle Management]
    end
    
    CLIENT --> MANAGER
    SERVER --> MANAGER
    MANAGER --> MEMORY
    MANAGER --> REDIS
    PROVIDER --> MANAGER
    PROVIDER --> DI
    SERVER --> SCHEDULER
    
    style CLIENT fill:#e8f5e8
    style SERVER fill:#fff3e0
    style MANAGER fill:#f3e5f5
    style PROVIDER fill:#e1f5fe
    style MEMORY fill:#fce4ec
    style REDIS fill:#f1f8e9
    style SCHEDULER fill:#f3e5f5
    style DI fill:#e8f5e8
Luồng hoạt động chính
  1. Service Provider khởi tạo và cấu hình tất cả các components
  2. Client gửi tasks thông qua Manager
  3. Manager chọn adapter phù hợp (Memory/Redis) dựa trên cấu hình
  4. Server với worker pool liên tục dequeue và xử lý tasks
  5. Scheduler quản lý delayed/scheduled tasks
  6. DI Container quản lý lifecycle và dependencies của tất cả services
go get go.fork.vn/queue

Cách sử dụng

1. Đăng ký Service Provider
Cách đơn giản (Auto-configuration)
package main

import (
    "go.fork.vn/di"
    "go.fork.vn/config"
    "go.fork.vn/redis"
    "go.fork.vn/scheduler"
    "go.fork.vn/queue"
)

func main() {
    app := di.New()
    
    // Đăng ký các providers cần thiết
    app.Register(config.NewServiceProvider()) // Required cho cấu hình
    app.Register(redis.NewServiceProvider())  // Required cho Redis adapter
    app.Register(scheduler.NewServiceProvider()) // Required cho delayed tasks
    app.Register(queue.NewServiceProvider())
    
    // Boot ứng dụng - tự động cấu hình từ file config
    app.Boot()
    
    // Giữ ứng dụng chạy để worker có thể xử lý tác vụ
    select {}
}
Cấu hình thông qua file config
# config/app.yaml
queue:
  adapter:
    default: "redis"  # hoặc "memory"
    memory:
      prefix: "queue:"
    redis:
      prefix: "queue:"
      provider_key: "default"  # Sử dụng Redis provider với key "default"
  
  server:
    concurrency: 10
    polling_interval: 1000  # milliseconds
    default_queue: "default"
    strict_priority: true
    queues: ["critical", "high", "default", "low"]
    shutdown_timeout: 30  # seconds
    log_level: 1
    retry_limit: 3
  
  client:
    default_options:
      queue: "default"
      max_retry: 3
      timeout: 30  # minutes

# Cấu hình Redis trong redis section
redis:
  default:  # Redis provider key được reference từ queue config
    host: "localhost"
    port: 6379
    password: ""
    db: 0
    cluster:
      enabled: false
      hosts: ["localhost:7000", "localhost:7001"]

scheduler:
  auto_start: true
  distributed_lock:
    enabled: true  # Cho môi trường distributed
2. Thêm tác vụ vào hàng đợi (Producer)
// Lấy queue manager từ container
container := app.Container()
manager := container.MustMake("queue").(queue.Manager)

// Lấy client từ manager
client := manager.Client()

// Thêm tác vụ ngay lập tức với options
payload := map[string]interface{}{
    "user_id": 123,
    "email":   "user@example.com",
    "action":  "welcome",
}

taskInfo, err := client.Enqueue("email:welcome", payload,
    queue.WithQueue("emails"),       // Chỉ định queue
    queue.WithMaxRetry(5),          // Tối đa 5 lần retry
    queue.WithTimeout(2*time.Minute), // Timeout sau 2 phút
    queue.WithTaskID("welcome-123"), // Custom task ID
)
if err != nil {
    log.Fatal(err)
}
fmt.Printf("Đã thêm tác vụ: %s vào queue: %s\n", taskInfo.ID, taskInfo.Queue)

// Thêm tác vụ delayed (chạy sau 5 phút)
taskInfo, err = client.EnqueueIn("reminder:task", 5*time.Minute, payload,
    queue.WithQueue("notifications"),
)
if err != nil {
    log.Fatal(err)
}

// Thêm tác vụ scheduled (chạy vào thời điểm cụ thể)
processAt := time.Date(2025, 6, 1, 9, 0, 0, 0, time.Local)
taskInfo, err = client.EnqueueAt("report:generate", processAt, payload,
    queue.WithQueue("reports"),
    queue.WithMaxRetry(3),
)
if err != nil {
    log.Fatal(err)
}
3. Xử lý tác vụ từ hàng đợi (Consumer)
// Lấy queue manager từ container và tạo server
manager := container.MustMake("queue").(queue.Manager)
server := manager.Server()

// Đăng ký handler cho email tasks
server.RegisterHandler("email:welcome", func(ctx context.Context, task *queue.Task) error {
    var payload map[string]interface{}
    if err := task.Unmarshal(&payload); err != nil {
        return fmt.Errorf("failed to unmarshal payload: %w", err)
    }
    
    userID := int(payload["user_id"].(float64))
    email := payload["email"].(string)
    
    log.Printf("Gửi email chào mừng đến %s (ID: %d)", email, userID)
    
    // Xử lý logic gửi email ở đây...
    // Có thể return error để trigger retry mechanism
    if !sendWelcomeEmail(email) {
        return fmt.Errorf("failed to send email to %s", email)
    }
    
    return nil
})

// Đăng ký handlers cho các loại tác vụ khác với error handling
server.RegisterHandler("reminder:task", func(ctx context.Context, task *queue.Task) error {
    log.Printf("Processing reminder task: %s", task.ID)
    
    // Check context timeout
    select {
    case <-ctx.Done():
        return ctx.Err()
    default:
        // Process reminder logic
        return processReminder(task)
    }
})

server.RegisterHandler("report:generate", func(ctx context.Context, task *queue.Task) error {
    log.Printf("Generating report: %s", task.ID)
    return generateReport(task)
})

// Đăng ký nhiều handlers cùng một lúc
server.RegisterHandlers(map[string]queue.HandlerFunc{
    "notification:push": handlePushNotification,
    "order:process":     handleOrderProcessing,
    "data:cleanup":      handleDataCleanup,
})

// Server tự động start khi ứng dụng boot
// Nhưng bạn cũng có thể control thủ công:
// err := server.Start()
// if err != nil {
//     log.Fatal(err)
// }

// Graceful shutdown
// defer server.Stop()
4. Tích hợp với Scheduler (Tính năng mới)

Queue Provider hiện đã tích hợp hoàn chỉnh với Scheduler Provider để xử lý các tác vụ phức tạp:

// Lấy scheduler từ manager
scheduler := manager.Scheduler()

// Schedule tasks mà sẽ enqueue jobs vào queue
scheduler.Every(5).Minutes().Do(func() {
    // Task định kỳ mỗi 5 phút
    client.Enqueue("maintenance:cleanup", map[string]interface{}{
        "type": "temporary_files",
        "timestamp": time.Now(),
    }, queue.WithQueue("maintenance"))
})

// Daily report generation
scheduler.Every(1).Days().At("09:00").Do(func() {
    client.Enqueue("report:daily", map[string]interface{}{
        "date": time.Now().Format("2006-01-02"),
        "type": "sales_summary",
    }, queue.WithQueue("reports"))
})

// Weekly backup với cron expression
scheduler.Cron("0 2 * * 0").Do(func() { // Chủ nhật 2:00 AM
    client.Enqueue("backup:weekly", map[string]interface{}{
        "week": time.Now().Format("2006-W02"),
    }, queue.WithQueue("maintenance"))
})

// Distributed scheduling (chỉ chạy trên 1 instance trong cluster)
scheduler.Every(10).Minutes().Tag("distributed").Do(func() {
    client.Enqueue("monitor:health_check", nil, queue.WithQueue("monitoring"))
})
5. Tùy chọn nâng cao khi thêm tác vụ

Queue Provider v0.0.3 cung cấp nhiều options linh hoạt:

// Tất cả options có thể dùng khi enqueue
taskInfo, err := client.Enqueue("image:resize", payload,
    queue.WithQueue("media"),                    // Chỉ định queue
    queue.WithMaxRetry(5),                      // Số lần retry tối đa
    queue.WithTimeout(10*time.Minute),          // Timeout cho task
    queue.WithTaskID("resize-user-123-photo"),  // Custom task ID
    queue.WithDelay(30*time.Second),            // Delay trước khi xử lý
    queue.WithDeadline(time.Now().Add(1*time.Hour)), // Deadline tuyệt đối
)
if err != nil {
    log.Fatal(err)
}

// Batch enqueue cho hiệu suất cao
tasks := []map[string]interface{}{
    {"user_id": 1, "action": "welcome"},
    {"user_id": 2, "action": "welcome"},
    {"user_id": 3, "action": "welcome"},
}

for i, task := range tasks {
    client.Enqueue("email:welcome", task,
        queue.WithQueue("emails"),
        queue.WithTaskID(fmt.Sprintf("welcome-%d", i)),
        queue.WithMaxRetry(3),
    )
}

// Process trong ưu tiên queues
highPriorityTask, _ := client.Enqueue("order:urgent", orderData,
    queue.WithQueue("critical"),
    queue.WithMaxRetry(5),
    queue.WithTimeout(30*time.Second),
)

lowPriorityTask, _ := client.Enqueue("analytics:update", analyticsData,
    queue.WithQueue("low"),
    queue.WithMaxRetry(1),
    queue.WithTimeout(5*time.Minute),
)
6. Sử dụng Memory Adapter (cho môi trường phát triển)
// Memory adapter tự động được sử dụng khi cấu hình default là "memory"
// Hoặc có thể khởi tạo trực tiếp:

package main

import (
    "go.fork.vn/di"
    "go.fork.vn/queue"
    "go.fork.vn/config"
    "go.fork.vn/redis"
)

func main() {
    app := di.New()
    app.Register(config.NewServiceProvider())
    app.Register(redis.NewServiceProvider())  // Required cho Redis adapter
    app.Register(queue.NewServiceProvider())
    
    // Cấu hình sử dụng memory adapter trong config/app.yaml:
    // queue:
    //   adapter:
    //     default: "memory"
    //     memory:
    //       prefix: "test_queue:"
    
    app.Boot()
    
    // Memory adapter không cần Redis và phù hợp cho:
    // - Unit testing
    // - Development environment
    // - Prototype/demo applications
    
    container := app.Container()
    client := container.MustMake("queue.client").(queue.Client)
    
    // Sử dụng giống hệt như Redis adapter
    client.Enqueue("test:task", "test payload")
}
7. Redis Provider Integration (Tính năng v0.0.5)

Queue Provider hiện đã tích hợp hoàn chỉnh với Redis Provider để centralize Redis configuration và cung cấp advanced Redis features:

// Redis configuration được quản lý bởi Redis Provider
// config/app.yaml
redis:
  default:  # Redis instance key
    host: "localhost"
    port: 6379
    password: ""
    db: 0
    max_retries: 3
    dial_timeout: 5
    read_timeout: 3
    write_timeout: 3
    pool_size: 10

queue:
  adapter:
    default: "redis"
    redis:
      prefix: "queue:"
      provider_key: "default"  # Reference Redis provider key

// Sử dụng advanced Redis features
package main

import (
    "context"
    "time"
    "go.fork.vn/di"
    "go.fork.vn/config"
    "go.fork.vn/redis"
    "go.fork.vn/queue"
    "go.fork.vn/queue/adapter"
)

func main() {
    app := di.New()
    app.Register(config.NewServiceProvider())
    app.Register(redis.NewServiceProvider())
    app.Register(queue.NewServiceProvider())
    app.Boot()

    container := app.Container()
    manager := container.MustMake("queue").(queue.Manager)
    
    // Lấy Redis queue adapter để sử dụng advanced features
    redisAdapter := manager.RedisAdapter()
    
    // Type assertion để access Redis-specific methods
    if redisQueue, ok := redisAdapter.(adapter.QueueRedisAdapter); ok {
        ctx := context.Background()
        
        // Priority queue operations
        err := redisQueue.EnqueueWithPriority(ctx, "tasks", &task, 10) // Priority 10
        if err != nil {
            log.Fatal(err)
        }
        
        // Dequeue from priority queue
        var priorityTask queue.Task
        err = redisQueue.DequeueFromPriority(ctx, "tasks", &priorityTask)
        if err != nil {
            log.Printf("No priority tasks available: %v", err)
        }
        
        // TTL support
        err = redisQueue.EnqueueWithTTL(ctx, "temporary", &task, 1*time.Hour)
        if err != nil {
            log.Fatal(err)
        }
        
        // Batch operations với pipeline
        tasks := []*queue.Task{&task1, &task2, &task3}
        err = redisQueue.EnqueueWithPipeline(ctx, "batch", tasks)
        if err != nil {
            log.Fatal(err)
        }
        
        // Multi-dequeue
        results, err := redisQueue.MultiDequeue(ctx, "batch", 5) // Lấy tối đa 5 tasks
        if err != nil {
            log.Fatal(err)
        }
        log.Printf("Dequeued %d tasks", len(results))
        
        // Queue monitoring
        info, err := redisQueue.GetQueueInfo(ctx, "tasks")
        if err != nil {
            log.Fatal(err)
        }
        log.Printf("Queue info: %+v", info)
        
        // Health check
        if err := redisQueue.Ping(ctx); err != nil {
            log.Printf("Redis connection issue: %v", err)
        }
        
        // Development/testing utilities
        if err := redisQueue.FlushQueues(ctx, []string{"test", "debug"}); err != nil {
            log.Printf("Failed to flush queues: %v", err)
        }
    }
}
8. Failed Jobs và Dead Letter Queue (Tính năng nâng cao)

Queue Provider v0.0.3 có hệ thống xử lý lỗi tiên tiến:

// Failed jobs được tự động retry với exponential backoff
server.RegisterHandler("risky:task", func(ctx context.Context, task *queue.Task) error {
    // Giả lập task có thể fail
    if rand.Float32() < 0.3 { // 30% chance fail
        return fmt.Errorf("simulated failure")
    }
    
    log.Printf("Task %s completed successfully", task.ID)
    return nil
})

// Tasks sẽ được retry tối đa theo cấu hình (mặc định 3 lần)
// Delay giữa các lần retry tăng theo exponential backoff:
// - Retry 1: 1 minute
// - Retry 2: 4 minutes  
// - Retry 3: 9 minutes

// Sau khi vượt quá retry limit, task sẽ được chuyển vào Dead Letter Queue
// Dead Letter Queue có thể được monitor và xử lý thủ công

// Hệ thống maintenance tự động:
// - Cleanup dead letter tasks cũ hơn 7 ngày (chạy mỗi giờ)
// - Retry failed tasks đủ điều kiện (chạy mỗi 5 phút)
// - Xử lý delayed tasks đã đến hạn (chạy mỗi 30 giây)
8. Monitoring và Debugging
// TaskInfo cung cấp thông tin chi tiết về task
taskInfo, err := client.Enqueue("debug:task", payload)
if err == nil {
    log.Printf("Task created: %s", taskInfo.String())
    // Output: Task ID: abc-123, Name: debug:task, Queue: default, 
    //         State: pending, Created: 2025-05-28T10:30:00Z
}

// Server logging tự động theo dõi:
// - Task processing time
// - Worker performance  
// - Retry attempts
// - Failed task reasons
// - Queue sizes

// Có thể tùy chỉnh log level trong config:
// queue:
//   server:
//     log_level: 2  # 0=SILENT, 1=ERROR, 2=INFO, 3=DEBUG
9. Production Best Practices
// Cấu hình production-ready
// config/production.yaml
/*
queue:
  adapter:
    default: "redis"
    redis:
      prefix: "myapp_queue:"
      provider_key: "default"  # Reference Redis provider key
  
  server:
    concurrency: 50              # Điều chỉnh theo CPU cores
    polling_interval: 500        # Giảm cho high-throughput
    strict_priority: true        # Đảm bảo critical tasks được ưu tiên
    queues: ["critical", "high", "default", "low", "bulk"]
    shutdown_timeout: 60         # Đủ thời gian cho graceful shutdown
    retry_limit: 5              # Tăng retry cho production
    
  client:
    default_options:
      queue: "default"
      max_retry: 3
      timeout: 15               # 15 phút timeout mặc định

# Redis cấu hình riêng biệt trong Redis provider
redis:
  default:
    host: "redis-cluster.internal"
    port: 6379
    password: "${REDIS_PASSWORD}"
    db: 0
    cluster:
      enabled: true
      hosts:
        - "redis-1.internal:6379"
        - "redis-2.internal:6379"
        - "redis-3.internal:6379"

scheduler:
  auto_start: true
  distributed_lock:
    enabled: true               # Bắt buộc cho production cluster
  options:
    key_prefix: "myapp_scheduler:"
    lock_duration: 120          # 2 phút
    max_retries: 5
    retry_delay: 500
*/

// Graceful shutdown handling
func setupGracefulShutdown(server queue.Server) {
    c := make(chan os.Signal, 1)
    signal.Notify(c, os.Interrupt, syscall.SIGTERM)
    
    go func() {
        <-c
        log.Println("Shutting down gracefully...")
        
        ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
        defer cancel()
        
        if err := server.Stop(); err != nil {
            log.Printf("Error during shutdown: %v", err)
        }
        
        log.Println("Shutdown complete")
        os.Exit(0)
    }()
}

📚 Tài liệu chi tiết

Để tìm hiểu sâu hơn về Go Queue, hãy tham khảo tài liệu chi tiết:

🚀 Quick Start

1. Cài đặt
go get go.fork.vn/queue
2. Tạo file cấu hình
# config/app.yaml
queue:
  adapter:
    default: "redis"
  server:
    concurrency: 10
    queues: ["critical", "default", "low"]
3. Khởi tạo ứng dụng
package main

import (
    "go.fork.vn/di"
    "go.fork.vn/config"
    "go.fork.vn/redis"
    "go.fork.vn/scheduler"
    "go.fork.vn/queue"
)

func main() {
    app := di.New()
    
    // Đăng ký providers
    app.Register(config.NewServiceProvider())
    app.Register(redis.NewServiceProvider())
    app.Register(scheduler.NewServiceProvider())
    app.Register(queue.NewServiceProvider())
    
    app.Boot()
    
    // Đăng ký handlers
    setupHandlers(app)
    
    // Giữ ứng dụng chạy
    select {}
}

func setupHandlers(app *di.Application) {
    server := app.Container().MustMake("queue.server").(queue.Server)
    
    server.RegisterHandler("email:send", func(ctx context.Context, task *queue.Task) error {
        // Xử lý gửi email
        return nil
    })
}
4. Thêm tasks
client := app.Container().MustMake("queue.client").(queue.Client)

// Thêm task ngay lập tức
client.Enqueue("email:send", map[string]interface{}{
    "to": "user@example.com",
    "subject": "Welcome!",
})

// Thêm task delayed
client.EnqueueIn("reminder", 1*time.Hour, reminderData)

🏆 Tính năng nổi bật

🎯 Performance & Scalability
  • Multi-threading: Hỗ trợ đến 1000+ workers đồng thời
  • Memory Pool: Tối ưu hóa memory allocation với 99.5% hit rate
  • Connection Pool: Quản lý Redis connections hiệu quả
  • Batch Processing: Xử lý hàng loạt tasks với throughput cao
🔄 Advanced Task Management
  • Priority Queues: 4 mức độ ưu tiên (critical, high, default, low)
  • Delayed Tasks: Lên lịch tasks với độ chính xác millisecond
  • Retry Logic: Exponential backoff với jitter để tránh thundering herd
  • Dead Letter Queue: Tự động xử lý failed tasks
  • Task Dependencies: Support task chains và workflows
🛡️ Production Ready
  • Health Checks: Monitoring endpoints cho Kubernetes/Docker
  • Metrics: Tích hợp Prometheus metrics
  • Graceful Shutdown: Zero-downtime deployments
  • Circuit Breaker: Tự động recovery khi Redis connection fail
  • Distributed Locks: Safe trong môi trường multi-instance
🔧 Developer Experience
  • Hot Reload: Thay đổi handlers mà không restart
  • Debug Mode: Chi tiết task execution traces
  • Testing Support: Built-in test utilities và mocks
  • Type Safety: Strongly typed interfaces với generics
  • IDE Integration: Full IntelliSense support

🤝 Đóng góp

Chúng tôi hoan nghênh mọi đóng góp! Hãy xem CONTRIBUTING.md để biết thêm chi tiết.

Phát triển local
# Clone repository
git clone https://github.com/go-fork/queue.git
cd queue

# Cài đặt dependencies
go mod download

# Chạy tests
make test

# Chạy examples
go run examples/basic/main.go

📊 Benchmarks

BenchmarkEnqueue-8           1000000    1.2 μs/op    0 allocs/op
BenchmarkDequeue-8            500000    2.1 μs/op    1 allocs/op
BenchmarkWorkerPool-8        2000000    0.8 μs/op    0 allocs/op
BenchmarkRedisAdapter-8       300000    4.5 μs/op    2 allocs/op
BenchmarkMemoryAdapter-8     5000000    0.3 μs/op    0 allocs/op

📈 Roadmap

v0.1.0 (Q2 2025)
  • WebUI Dashboard cho monitoring
  • GraphQL API endpoints
  • Workflow Engine cho complex task chains
  • Built-in rate limiting
v0.2.0 (Q3 2025)
  • Kafka adapter support
  • Multi-tenant queues
  • A/B testing framework
  • Auto-scaling workers

❓ FAQ

Q: Go Queue có khác gì với Celery không? A: Go Queue được thiết kế native cho Go với type safety, performance cao hơn và tích hợp DI framework.

Q: Có thể sử dụng Go Queue mà không cần Redis không? A: Có, sử dụng Memory adapter cho development hoặc single-instance deployments.

Q: Performance như thế nào so với các solutions khác? A: Go Queue có thể xử lý 10,000+ tasks/second với latency < 1ms trên hardware thông thường.

📄 License

Dự án này được phát hành dưới MIT License.

🔗 Liên kết hữu ích


Made with ❤️ by the Go Fork team

Documentation

Overview

Package queue cung cấp một service provider để quản lý hàng đợi và xử lý tác vụ bất đồng bộ với thiết kế đơn giản, linh hoạt hỗ trợ cả Redis và bộ nhớ trong. Package đã được nâng cấp để tích hợp hoàn chỉnh với Redis Provider và cung cấp các tính năng Redis nâng cao như priority queues, TTL support, và batch operations.

Package này cung cấp APIs để đưa tác vụ vào hàng đợi, lên lịch thực thi tác vụ và xử lý tác vụ qua các handlers đăng ký. Nó hỗ trợ:

- Thực thi ngay lập tức (immediate execution) - Trì hoãn thực thi theo khoảng thời gian (delayed execution) - Lên lịch thực thi vào thời điểm cụ thể (scheduled execution) - Xử lý tác vụ song song với mức độ song song có thể cấu hình - Hệ thống thử lại tự động với chiến lược backoff - Ưu tiên giữa các hàng đợi khác nhau (queue-level priority) - Priority queues với Redis Sorted Sets (task-level priority) - TTL support cho temporary tasks - Batch operations với Redis pipelines - Queue monitoring và health checks - Hàng đợi trong bộ nhớ cho môi trường phát triển và kiểm thử

Package cung cấp ba thành phần chính:

1. Manager: Quản lý các thành phần trong queue và cung cấp cấu hình 2. Client: Cho phép đưa tác vụ vào hàng đợi (producer) 3. Server: Xử lý các tác vụ từ hàng đợi thông qua handlers (consumer)

Hỗ trợ hai adapter chính:

1. Redis adapter: Cho môi trường production với khả năng mở rộng cao

  • Tích hợp với Redis Provider để centralize configuration
  • Enhanced Redis features: priority queues, TTL, pipelines
  • Advanced monitoring và health checks

2. Memory adapter: Cho môi trường phát triển và kiểm thử

Ví dụ sử dụng với Redis Provider:

// Đăng ký service providers
app.Register(config.NewServiceProvider())
app.Register(redis.NewServiceProvider())  // Required cho Redis adapter
app.Register(scheduler.NewServiceProvider())
app.Register(queue.NewServiceProvider())

// Cấu hình trong config/app.yaml
/*
redis:
  default:
    host: "localhost"
    port: 6379
    db: 0

queue:
  adapter:
    default: "redis"
    redis:
      prefix: "queue:"
      provider_key: "default"  # Reference Redis provider
*/

// Sử dụng client để đưa tác vụ vào hàng đợi
client := app.Make("queue.client").(queue.Client)
client.Enqueue("email:send", map[string]interface{}{
    "to":      "user@example.com",
    "subject": "Hello",
}, queue.WithQueue("emails"), queue.WithMaxRetry(3))

// Sử dụng advanced Redis features
manager := app.Make("queue").(queue.Manager)
if redisAdapter, ok := manager.RedisAdapter().(adapter.QueueRedisAdapter); ok {
    // Priority queue
    redisAdapter.EnqueueWithPriority(ctx, "tasks", &task, 10)

    // TTL support
    redisAdapter.EnqueueWithTTL(ctx, "temporary", &task, 1*time.Hour)

    // Batch operations
    redisAdapter.EnqueueWithPipeline(ctx, "batch", tasks)

    // Monitoring
    info, _ := redisAdapter.GetQueueInfo(ctx, "tasks")
}

// Sử dụng server để xử lý tác vụ
server := app.Make("queue.server").(queue.Server)
server.RegisterHandler("email:send", func(ctx context.Context, task *queue.Task) error {
    var payload map[string]interface{}
    if err := task.Unmarshal(&payload); err != nil {
        return err
    }
    // Xử lý logic gửi email ở đây...
    return nil
})

// Bắt đầu xử lý tác vụ
if err := server.Start(); err != nil {
    log.Fatal(err)
}

Sử dụng bộ nhớ trong (cho phát triển và kiểm thử):

// Khởi tạo client với bộ nhớ trong
client := queue.NewMemoryClient()

// Khởi tạo server với bộ nhớ trong
server := queue.NewMemoryServer(queue.ServerOptions{
    Concurrency: 5,
})

Xử lý tác vụ theo lịch:

// Lên lịch tác vụ sau 5 phút
client.EnqueueIn("report:generate", 5*time.Minute, payload)

// Lên lịch tác vụ vào thời điểm cụ thể
processAt := time.Date(2025, 5, 24, 15, 0, 0, 0, time.Local)
client.EnqueueAt("cleanup:old-data", processAt, payload)

Các tùy chọn khác:

// Tùy chỉnh queue, số lần thử lại, timeout...
client.Enqueue("image:resize", payload,
    queue.WithQueue("media"),
    queue.WithMaxRetry(3),
    queue.WithTimeout(2*time.Minute),
    queue.WithTaskID("resize-123"),
)

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewServiceProvider

func NewServiceProvider() di.ServiceProvider

NewServiceProvider tạo một provider dịch vụ queue mới với cấu hình mặc định.

Sử dụng hàm này để tạo một provider có thể được đăng ký với một instance di.Container.

Trả về:

  • di.ServiceProvider: một service provider cho queue

Ví dụ:

app := myapp.New()
app.Register(queue.NewServiceProvider())

Types

type AdapterConfig

type AdapterConfig struct {
	// Default xác định adapter mặc định sẽ được sử dụng.
	// Các giá trị hợp lệ: "memory", "redis"
	Default string `mapstructure:"default"`

	// Memory chứa cấu hình cho memory adapter.
	Memory MemoryConfig `mapstructure:"memory"`

	// Redis chứa cấu hình cho redis adapter.
	Redis RedisConfig `mapstructure:"redis"`
}

AdapterConfig chứa cấu hình cho các adapter.

type Client

type Client interface {
	// Enqueue đưa một tác vụ vào hàng đợi để xử lý ngay lập tức.
	Enqueue(taskName string, payload interface{}, opts ...Option) (*TaskInfo, error)

	// EnqueueContext tương tự Enqueue nhưng với context.
	EnqueueContext(ctx context.Context, taskName string, payload interface{}, opts ...Option) (*TaskInfo, error)

	// EnqueueIn đưa một tác vụ vào hàng đợi để xử lý sau một khoảng thời gian.
	EnqueueIn(taskName string, delay time.Duration, payload interface{}, opts ...Option) (*TaskInfo, error)

	// EnqueueAt đưa một tác vụ vào hàng đợi để xử lý vào một thời điểm cụ thể.
	EnqueueAt(taskName string, processAt time.Time, payload interface{}, opts ...Option) (*TaskInfo, error)

	// Close đóng kết nối của client.
	Close() error
}

Client là interface cho việc đưa tác vụ vào hàng đợi.

func NewClient

func NewClient(redisClient *redis.Client) Client

NewClient tạo một Client mới với Redis.

func NewClientWithAdapter

func NewClientWithAdapter(adapter adapter.QueueAdapter) Client

NewClientWithAdapter tạo một Client mới với adapter QueueAdapter.

func NewClientWithUniversalClient

func NewClientWithUniversalClient(redisClient redis.UniversalClient) Client

NewClientWithUniversalClient tạo một Client mới với Redis UniversalClient.

func NewMemoryClient

func NewMemoryClient() Client

NewMemoryClient tạo một Client mới với bộ nhớ trong.

type ClientConfig

type ClientConfig struct {
	// DefaultOptions chứa các tùy chọn mặc định cho tác vụ.
	DefaultOptions ClientDefaultOptions `mapstructure:"defaultOptions"`
}

ClientConfig chứa cấu hình cho queue client.

type ClientDefaultOptions

type ClientDefaultOptions struct {
	// Queue là tên queue mặc định cho các tác vụ.
	Queue string `mapstructure:"queue"`

	// MaxRetry là số lần thử lại tối đa cho tác vụ bị lỗi.
	MaxRetry int `mapstructure:"maxRetry"`

	// Timeout là thời gian tối đa để tác vụ hoàn thành (tính bằng phút).
	Timeout int `mapstructure:"timeout"`
}

ClientDefaultOptions chứa các tùy chọn mặc định cho tác vụ.

type Config

type Config struct {
	// Adapter chứa cấu hình cho các queue adapter.
	Adapter AdapterConfig `mapstructure:"adapter"`

	// Server chứa cấu hình cho queue server.
	Server ServerConfig `mapstructure:"server"`

	// Client chứa cấu hình cho queue client.
	Client ClientConfig `mapstructure:"client"`
}

Config chứa cấu hình cho queue package.

func DefaultConfig

func DefaultConfig() Config

DefaultConfig trả về cấu hình mặc định cho queue.

type DeadLetterTask

type DeadLetterTask struct {
	Task     Task      `json:"task"`
	Reason   string    `json:"reason"`
	FailedAt time.Time `json:"failed_at"`
}

DeadLetterTask đại diện cho một task trong dead letter queue

type HandlerFunc

type HandlerFunc func(ctx context.Context, task *Task) error

HandlerFunc là một hàm xử lý tác vụ.

type Manager

type Manager interface {
	// RedisClient trả về Redis client.
	RedisClient() redisClient.UniversalClient

	// MemoryAdapter trả về memory queue adapter.
	MemoryAdapter() adapter.QueueAdapter

	// RedisAdapter trả về redis queue adapter.
	RedisAdapter() adapter.QueueAdapter

	// Adapter trả về queue adapter dựa trên cấu hình.
	Adapter(name string) adapter.QueueAdapter

	// Client trả về Client.
	Client() Client

	// Server trả về Server.
	Server() Server

	// Scheduler trả về Scheduler manager để lên lịch tasks.
	Scheduler() scheduler.Manager

	// SetScheduler thiết lập scheduler manager từ bên ngoài.
	SetScheduler(scheduler scheduler.Manager)
}

Manager định nghĩa interface cho việc quản lý các thành phần queue.

func NewManager

func NewManager(config Config) Manager

NewManager tạo một manager mới với cấu hình mặc định.

func NewManagerWithContainer

func NewManagerWithContainer(config Config, container di.Container) Manager

NewManagerWithContainer tạo một manager mới với container DI để truy cập Redis provider.

type MemoryConfig

type MemoryConfig struct {
	// Prefix là tiền tố cho tên của các queue trong bộ nhớ.
	Prefix string `mapstructure:"prefix"`
}

MemoryConfig chứa cấu hình cho memory adapter.

type Option

type Option func(*TaskOptions)

Option là một hàm để cấu hình tác vụ.

func WithDeadline

func WithDeadline(t time.Time) Option

WithDeadline đặt thời hạn thực hiện cho tác vụ.

func WithDelay

func WithDelay(d time.Duration) Option

WithDelay đặt thời gian trì hoãn cho tác vụ.

func WithMaxRetry

func WithMaxRetry(n int) Option

WithMaxRetry đặt số lần thử lại tối đa cho tác vụ.

func WithProcessAt

func WithProcessAt(t time.Time) Option

WithProcessAt đặt thời điểm xử lý cho tác vụ.

func WithQueue

func WithQueue(queue string) Option

WithQueue đặt tên hàng đợi cho tác vụ.

func WithTaskID

func WithTaskID(id string) Option

WithTaskID đặt ID tùy chỉnh cho tác vụ.

func WithTimeout

func WithTimeout(d time.Duration) Option

WithTimeout đặt thời gian timeout cho tác vụ.

type RedisConfig

type RedisConfig struct {
	// Prefix là tiền tố cho tên của các queue trong Redis.
	Prefix string `mapstructure:"prefix"`

	// ProviderKey là khóa để lấy Redis provider từ DI container.
	// Mặc định là "redis" nếu không được cấu hình.
	ProviderKey string `mapstructure:"provider_key"`
}

RedisConfig chứa cấu hình cho redis adapter.

type Server

type Server interface {
	// RegisterHandler đăng ký một handler cho một loại tác vụ.
	RegisterHandler(taskName string, handler HandlerFunc)

	// RegisterHandlers đăng ký nhiều handler cùng một lúc.
	RegisterHandlers(handlers map[string]HandlerFunc)

	// Start bắt đầu xử lý tác vụ (worker).
	Start() error

	// Stop dừng xử lý tác vụ.
	Stop() error

	// SetScheduler thiết lập scheduler cho server để xử lý delayed tasks.
	SetScheduler(scheduler scheduler.Manager)

	// GetScheduler trả về scheduler hiện tại.
	GetScheduler() scheduler.Manager
}

Server là interface cho việc xử lý tác vụ từ hàng đợi.

func NewServer

func NewServer(redisClient redis.UniversalClient, opts ServerOptions) Server

NewServer tạo một Server mới.

func NewServerWithAdapter

func NewServerWithAdapter(adapter adapter.QueueAdapter, opts ServerOptions) Server

NewServerWithAdapter tạo một Server mới với adapter QueueAdapter được cung cấp.

type ServerConfig

type ServerConfig struct {
	// Concurrency là số lượng worker xử lý tác vụ cùng một lúc.
	Concurrency int `mapstructure:"concurrency"`

	// PollingInterval là khoảng thời gian giữa các lần kiểm tra tác vụ mới (tính bằng mili giây).
	PollingInterval int `mapstructure:"pollingInterval"`

	// DefaultQueue là tên queue mặc định nếu không có queue nào được chỉ định.
	DefaultQueue string `mapstructure:"defaultQueue"`

	// StrictPriority xác định liệu có ưu tiên nghiêm ngặt các queue ưu tiên cao hay không.
	StrictPriority bool `mapstructure:"strictPriority"`

	// Queues là danh sách các queue cần lắng nghe, theo thứ tự ưu tiên.
	Queues []string `mapstructure:"queues"`

	// ShutdownTimeout là thời gian chờ để các worker hoàn tất tác vụ khi dừng server (tính bằng giây).
	ShutdownTimeout int `mapstructure:"shutdownTimeout"`

	// LogLevel xác định mức độ log.
	LogLevel int `mapstructure:"logLevel"`

	// RetryLimit xác định số lần thử lại tối đa cho tác vụ bị lỗi.
	RetryLimit int `mapstructure:"retryLimit"`
}

ServerConfig chứa cấu hình cho queue server.

type ServerOptions

type ServerOptions struct {
	// Concurrency xác định số lượng worker xử lý tác vụ song song.
	Concurrency int

	// PollingInterval xác định thời gian chờ giữa các lần kiểm tra tác vụ (tính bằng mili giây).
	PollingInterval int

	// DefaultQueue xác định tên queue mặc định nếu không có queue nào được chỉ định.
	DefaultQueue string

	// StrictPriority xác định liệu có nên ưu tiên nghiêm ngặt giữa các hàng đợi.
	StrictPriority bool

	// Queues xác định danh sách các queue cần lắng nghe theo thứ tự ưu tiên.
	Queues []string

	// ShutdownTimeout xác định thời gian chờ để các worker hoàn tất tác vụ khi dừng server.
	ShutdownTimeout time.Duration

	// LogLevel xác định mức log.
	LogLevel int

	// RetryLimit xác định số lần thử lại tối đa cho tác vụ bị lỗi.
	RetryLimit int
}

ServerOptions chứa các tùy chọn cấu hình cho server.

type ServiceProvider

type ServiceProvider interface {
	di.ServiceProvider
	// contains filtered or unexported methods
}

type Task

type Task struct {
	// ID là định danh duy nhất của tác vụ
	ID string

	// Name là tên của loại tác vụ
	Name string

	// Payload là dữ liệu của tác vụ dưới dạng bytes
	Payload []byte

	// Queue là tên của hàng đợi chứa tác vụ
	Queue string

	// MaxRetry là số lần thử lại tối đa nếu tác vụ thất bại
	MaxRetry int

	// RetryCount là số lần tác vụ đã được thử lại
	RetryCount int

	// CreatedAt là thời điểm tác vụ được tạo
	CreatedAt time.Time

	// ProcessAt là thời điểm tác vụ sẽ được xử lý
	ProcessAt time.Time
}

Task đại diện cho một tác vụ cần được xử lý.

func NewTask

func NewTask(name string, payload []byte) *Task

NewTask tạo một tác vụ mới với tên và payload được cung cấp.

func (*Task) Unmarshal

func (t *Task) Unmarshal(v interface{}) error

Unmarshal giải mã payload thành một struct.

type TaskInfo

type TaskInfo struct {
	// ID là định danh duy nhất của tác vụ
	ID string

	// Name là tên của loại tác vụ
	Name string

	// Queue là tên của hàng đợi chứa tác vụ
	Queue string

	// MaxRetry là số lần thử lại tối đa nếu tác vụ thất bại
	MaxRetry int

	// State là trạng thái hiện tại của tác vụ (ví dụ: "pending", "scheduled", "processing", "completed")
	State string

	// CreatedAt là thời điểm tác vụ được tạo
	CreatedAt time.Time

	// ProcessAt là thời điểm tác vụ sẽ được xử lý
	ProcessAt time.Time
}

TaskInfo chứa thông tin về một tác vụ đã được đưa vào hàng đợi.

func (*TaskInfo) String

func (info *TaskInfo) String() string

String trả về biểu diễn chuỗi của TaskInfo.

type TaskOptions

type TaskOptions struct {
	// Queue là tên hàng đợi cho tác vụ
	Queue string

	// MaxRetry là số lần thử lại tối đa nếu tác vụ thất bại
	MaxRetry int

	// Timeout là thời gian tối đa để tác vụ hoàn thành
	Timeout time.Duration

	// Deadline là thời hạn chót để tác vụ hoàn thành
	Deadline time.Time

	// Delay là thời gian trì hoãn trước khi xử lý tác vụ
	Delay time.Duration

	// ProcessAt là thời điểm cụ thể để xử lý tác vụ
	ProcessAt time.Time

	// TaskID là ID tùy chỉnh cho tác vụ
	TaskID string
}

TaskOptions chứa các tùy chọn khi đưa tác vụ vào hàng đợi.

func ApplyOptions

func ApplyOptions(opts ...Option) *TaskOptions

ApplyOptions áp dụng các tùy chọn vào TaskOptions.

func GetDefaultOptions

func GetDefaultOptions() *TaskOptions

GetDefaultOptions trả về các tùy chọn mặc định.

Directories

Path Synopsis
Package adapter cung cấp các triển khai khác nhau cho queue backend.
Package adapter cung cấp các triển khai khác nhau cho queue backend.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL