queue

package module
v0.0.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 28, 2025 License: MIT Imports: 14 Imported by: 1

README

Queue Provider

Queue Provider là giải pháp xử lý hàng đợi và tác vụ nền đơn giản nhưng mạnh mẽ cho ứng dụng Go, với tích hợp hoàn chỉnh scheduler và khả năng xử lý tác vụ phức tạp.

Tính năng nổi bật

  • Triển khai đơn giản: Dễ bảo trì và mở rộng với kiến trúc module hóa
  • Dual Adapter Support: Hỗ trợ Redis và Memory adapter cho mọi môi trường
  • Redis Provider Integration: Tích hợp hoàn chỉnh với Redis Provider để centralize Redis configuration
  • Enhanced Redis Features: Priority queues, TTL support, pipeline operations, monitoring
  • Scheduler Integration: Tích hợp hoàn chỉnh với Scheduler Provider để xử lý delayed/scheduled tasks
  • Advanced Task Management: Hỗ trợ retry logic, dead letter queue và task tracking
  • Configuration-driven: Cấu hình hoàn toàn qua file config với struct validation
  • Worker Model: Xử lý tác vụ đa luồng với concurrency control
  • Queue Priority: Hỗ trợ multiple queues với strict priority
  • Batch Processing: API batch processing cho hiệu suất cao
  • Maintenance Tasks: Tự động cleanup và retry failed jobs
  • DI Integration: Tích hợp hoàn chỉnh với DI container

Cài đặt

Để cài đặt Queue Provider, bạn có thể sử dụng lệnh go get:

go get github.com/go-fork/providers/queue

Cách sử dụng

1. Đăng ký Service Provider
Cách đơn giản (Auto-configuration)
package main

import (
    "github.com/go-fork/di"
    "github.com/go-fork/providers/config"
    "github.com/go-fork/providers/redis"
    "github.com/go-fork/providers/scheduler"
    "github.com/go-fork/providers/queue"
)

func main() {
    app := di.New()
    
    // Đăng ký các providers cần thiết
    app.Register(config.NewServiceProvider()) // Required cho cấu hình
    app.Register(redis.NewServiceProvider())  // Required cho Redis adapter
    app.Register(scheduler.NewServiceProvider()) // Required cho delayed tasks
    app.Register(queue.NewServiceProvider())
    
    // Boot ứng dụng - tự động cấu hình từ file config
    app.Boot()
    
    // Giữ ứng dụng chạy để worker có thể xử lý tác vụ
    select {}
}
Cấu hình thông qua file config
# config/app.yaml
queue:
  adapter:
    default: "redis"  # hoặc "memory"
    memory:
      prefix: "queue:"
    redis:
      prefix: "queue:"
      provider_key: "default"  # Sử dụng Redis provider với key "default"
  
  server:
    concurrency: 10
    polling_interval: 1000  # milliseconds
    default_queue: "default"
    strict_priority: true
    queues: ["critical", "high", "default", "low"]
    shutdown_timeout: 30  # seconds
    log_level: 1
    retry_limit: 3
  
  client:
    default_options:
      queue: "default"
      max_retry: 3
      timeout: 30  # minutes

# Cấu hình Redis trong redis section
redis:
  default:  # Redis provider key được reference từ queue config
    host: "localhost"
    port: 6379
    password: ""
    db: 0
    cluster:
      enabled: false
      hosts: ["localhost:7000", "localhost:7001"]

scheduler:
  auto_start: true
  distributed_lock:
    enabled: true  # Cho môi trường distributed
2. Thêm tác vụ vào hàng đợi (Producer)
// Lấy queue manager từ container
container := app.Container()
manager := container.MustMake("queue").(queue.Manager)

// Hoặc lấy trực tiếp client từ container
client := container.MustMake("queue.client").(queue.Client)

// Thêm tác vụ ngay lập tức với options
payload := map[string]interface{}{
    "user_id": 123,
    "email":   "user@example.com",
    "action":  "welcome",
}

taskInfo, err := client.Enqueue("email:welcome", payload,
    queue.WithQueue("emails"),       // Chỉ định queue
    queue.WithMaxRetry(5),          // Tối đa 5 lần retry
    queue.WithTimeout(2*time.Minute), // Timeout sau 2 phút
    queue.WithTaskID("welcome-123"), // Custom task ID
)
if err != nil {
    log.Fatal(err)
}
fmt.Printf("Đã thêm tác vụ: %s vào queue: %s\n", taskInfo.ID, taskInfo.Queue)

// Thêm tác vụ delayed (chạy sau 5 phút)
taskInfo, err = client.EnqueueIn("reminder:task", 5*time.Minute, payload,
    queue.WithQueue("notifications"),
)
if err != nil {
    log.Fatal(err)
}

// Thêm tác vụ scheduled (chạy vào thời điểm cụ thể)
processAt := time.Date(2025, 6, 1, 9, 0, 0, 0, time.Local)
taskInfo, err = client.EnqueueAt("report:generate", processAt, payload,
    queue.WithQueue("reports"),
    queue.WithMaxRetry(3),
)
if err != nil {
    log.Fatal(err)
}
3. Xử lý tác vụ từ hàng đợi (Consumer)
// Lấy queue server từ container
server := container.MustMake("queue.server").(queue.Server)

// Đăng ký handler cho email tasks
server.RegisterHandler("email:welcome", func(ctx context.Context, task *queue.Task) error {
    var payload map[string]interface{}
    if err := task.Unmarshal(&payload); err != nil {
        return fmt.Errorf("failed to unmarshal payload: %w", err)
    }
    
    userID := int(payload["user_id"].(float64))
    email := payload["email"].(string)
    
    log.Printf("Gửi email chào mừng đến %s (ID: %d)", email, userID)
    
    // Xử lý logic gửi email ở đây...
    // Có thể return error để trigger retry mechanism
    if !sendWelcomeEmail(email) {
        return fmt.Errorf("failed to send email to %s", email)
    }
    
    return nil
})

// Đăng ký handlers cho các loại tác vụ khác với error handling
server.RegisterHandler("reminder:task", func(ctx context.Context, task *queue.Task) error {
    log.Printf("Processing reminder task: %s", task.ID)
    
    // Check context timeout
    select {
    case <-ctx.Done():
        return ctx.Err()
    default:
        // Process reminder logic
        return processReminder(task)
    }
})

server.RegisterHandler("report:generate", func(ctx context.Context, task *queue.Task) error {
    log.Printf("Generating report: %s", task.ID)
    return generateReport(task)
})

// Đăng ký nhiều handlers cùng một lúc
server.RegisterHandlers(map[string]queue.HandlerFunc{
    "notification:push": handlePushNotification,
    "order:process":     handleOrderProcessing,
    "data:cleanup":      handleDataCleanup,
})

// Server tự động start khi ứng dụng boot
// Nhưng bạn cũng có thể control thủ công:
// err := server.Start()
// if err != nil {
//     log.Fatal(err)
// }

// Graceful shutdown
// defer server.Stop()
4. Tích hợp với Scheduler (Tính năng mới)

Queue Provider hiện đã tích hợp hoàn chỉnh với Scheduler Provider để xử lý các tác vụ phức tạp:

// Lấy scheduler từ manager
scheduler := manager.Scheduler()

// Schedule tasks mà sẽ enqueue jobs vào queue
scheduler.Every(5).Minutes().Do(func() {
    // Task định kỳ mỗi 5 phút
    client.Enqueue("maintenance:cleanup", map[string]interface{}{
        "type": "temporary_files",
        "timestamp": time.Now(),
    }, queue.WithQueue("maintenance"))
})

// Daily report generation
scheduler.Every(1).Days().At("09:00").Do(func() {
    client.Enqueue("report:daily", map[string]interface{}{
        "date": time.Now().Format("2006-01-02"),
        "type": "sales_summary",
    }, queue.WithQueue("reports"))
})

// Weekly backup với cron expression
scheduler.Cron("0 2 * * 0").Do(func() { // Chủ nhật 2:00 AM
    client.Enqueue("backup:weekly", map[string]interface{}{
        "week": time.Now().Format("2006-W02"),
    }, queue.WithQueue("maintenance"))
})

// Distributed scheduling (chỉ chạy trên 1 instance trong cluster)
scheduler.Every(10).Minutes().Tag("distributed").Do(func() {
    client.Enqueue("monitor:health_check", nil, queue.WithQueue("monitoring"))
})
5. Tùy chọn nâng cao khi thêm tác vụ

Queue Provider v0.0.3 cung cấp nhiều options linh hoạt:

// Tất cả options có thể dùng khi enqueue
taskInfo, err := client.Enqueue("image:resize", payload,
    queue.WithQueue("media"),                    // Chỉ định queue
    queue.WithMaxRetry(5),                      // Số lần retry tối đa
    queue.WithTimeout(10*time.Minute),          // Timeout cho task
    queue.WithTaskID("resize-user-123-photo"),  // Custom task ID
    queue.WithDelay(30*time.Second),            // Delay trước khi xử lý
    queue.WithDeadline(time.Now().Add(1*time.Hour)), // Deadline tuyệt đối
)
if err != nil {
    log.Fatal(err)
}

// Batch enqueue cho hiệu suất cao
tasks := []map[string]interface{}{
    {"user_id": 1, "action": "welcome"},
    {"user_id": 2, "action": "welcome"},
    {"user_id": 3, "action": "welcome"},
}

for i, task := range tasks {
    client.Enqueue("email:welcome", task,
        queue.WithQueue("emails"),
        queue.WithTaskID(fmt.Sprintf("welcome-%d", i)),
        queue.WithMaxRetry(3),
    )
}

// Process trong ưu tiên queues
highPriorityTask, _ := client.Enqueue("order:urgent", orderData,
    queue.WithQueue("critical"),
    queue.WithMaxRetry(5),
    queue.WithTimeout(30*time.Second),
)

lowPriorityTask, _ := client.Enqueue("analytics:update", analyticsData,
    queue.WithQueue("low"),
    queue.WithMaxRetry(1),
    queue.WithTimeout(5*time.Minute),
)
6. Sử dụng Memory Adapter (cho môi trường phát triển)
// Memory adapter tự động được sử dụng khi cấu hình default là "memory"
// Hoặc có thể khởi tạo trực tiếp:

package main

import (
    "github.com/go-fork/di"
    "github.com/go-fork/providers/queue"
    "github.com/go-fork/providers/config"
    "github.com/go-fork/providers/redis"
)

func main() {
    app := di.New()
    app.Register(config.NewServiceProvider())
    app.Register(redis.NewServiceProvider())  // Required cho Redis adapter
    app.Register(queue.NewServiceProvider())
    
    // Cấu hình sử dụng memory adapter trong config/app.yaml:
    // queue:
    //   adapter:
    //     default: "memory"
    //     memory:
    //       prefix: "test_queue:"
    
    app.Boot()
    
    // Memory adapter không cần Redis và phù hợp cho:
    // - Unit testing
    // - Development environment
    // - Prototype/demo applications
    
    container := app.Container()
    client := container.MustMake("queue.client").(queue.Client)
    
    // Sử dụng giống hệt như Redis adapter
    client.Enqueue("test:task", "test payload")
}
7. Redis Provider Integration (Tính năng v0.0.5)

Queue Provider hiện đã tích hợp hoàn chỉnh với Redis Provider để centralize Redis configuration và cung cấp advanced Redis features:

// Redis configuration được quản lý bởi Redis Provider
// config/app.yaml
redis:
  default:  # Redis instance key
    host: "localhost"
    port: 6379
    password: ""
    db: 0
    max_retries: 3
    dial_timeout: 5
    read_timeout: 3
    write_timeout: 3
    pool_size: 10

queue:
  adapter:
    default: "redis"
    redis:
      prefix: "queue:"
      provider_key: "default"  # Reference Redis provider key

// Sử dụng advanced Redis features
package main

import (
    "context"
    "time"
    "github.com/go-fork/di"
    "github.com/go-fork/providers/config"
    "github.com/go-fork/providers/redis"
    "github.com/go-fork/providers/queue"
    "github.com/go-fork/providers/queue/adapter"
)

func main() {
    app := di.New()
    app.Register(config.NewServiceProvider())
    app.Register(redis.NewServiceProvider())
    app.Register(queue.NewServiceProvider())
    app.Boot()

    container := app.Container()
    manager := container.MustMake("queue").(queue.Manager)
    
    // Lấy Redis queue adapter để sử dụng advanced features
    redisAdapter := manager.RedisAdapter()
    
    // Type assertion để access Redis-specific methods
    if redisQueue, ok := redisAdapter.(adapter.QueueRedisAdapter); ok {
        ctx := context.Background()
        
        // Priority queue operations
        err := redisQueue.EnqueueWithPriority(ctx, "tasks", &task, 10) // Priority 10
        if err != nil {
            log.Fatal(err)
        }
        
        // Dequeue from priority queue
        var priorityTask queue.Task
        err = redisQueue.DequeueFromPriority(ctx, "tasks", &priorityTask)
        if err != nil {
            log.Printf("No priority tasks available: %v", err)
        }
        
        // TTL support
        err = redisQueue.EnqueueWithTTL(ctx, "temporary", &task, 1*time.Hour)
        if err != nil {
            log.Fatal(err)
        }
        
        // Batch operations với pipeline
        tasks := []*queue.Task{&task1, &task2, &task3}
        err = redisQueue.EnqueueWithPipeline(ctx, "batch", tasks)
        if err != nil {
            log.Fatal(err)
        }
        
        // Multi-dequeue
        results, err := redisQueue.MultiDequeue(ctx, "batch", 5) // Lấy tối đa 5 tasks
        if err != nil {
            log.Fatal(err)
        }
        log.Printf("Dequeued %d tasks", len(results))
        
        // Queue monitoring
        info, err := redisQueue.GetQueueInfo(ctx, "tasks")
        if err != nil {
            log.Fatal(err)
        }
        log.Printf("Queue info: %+v", info)
        
        // Health check
        if err := redisQueue.Ping(ctx); err != nil {
            log.Printf("Redis connection issue: %v", err)
        }
        
        // Development/testing utilities
        if err := redisQueue.FlushQueues(ctx, []string{"test", "debug"}); err != nil {
            log.Printf("Failed to flush queues: %v", err)
        }
    }
}
8. Failed Jobs và Dead Letter Queue (Tính năng nâng cao)

Queue Provider v0.0.3 có hệ thống xử lý lỗi tiên tiến:

// Failed jobs được tự động retry với exponential backoff
server.RegisterHandler("risky:task", func(ctx context.Context, task *queue.Task) error {
    // Giả lập task có thể fail
    if rand.Float32() < 0.3 { // 30% chance fail
        return fmt.Errorf("simulated failure")
    }
    
    log.Printf("Task %s completed successfully", task.ID)
    return nil
})

// Tasks sẽ được retry tối đa theo cấu hình (mặc định 3 lần)
// Delay giữa các lần retry tăng theo exponential backoff:
// - Retry 1: 1 minute
// - Retry 2: 4 minutes  
// - Retry 3: 9 minutes

// Sau khi vượt quá retry limit, task sẽ được chuyển vào Dead Letter Queue
// Dead Letter Queue có thể được monitor và xử lý thủ công

// Hệ thống maintenance tự động:
// - Cleanup dead letter tasks cũ hơn 7 ngày (chạy mỗi giờ)
// - Retry failed tasks đủ điều kiện (chạy mỗi 5 phút)
// - Xử lý delayed tasks đã đến hạn (chạy mỗi 30 giây)
8. Monitoring và Debugging
// TaskInfo cung cấp thông tin chi tiết về task
taskInfo, err := client.Enqueue("debug:task", payload)
if err == nil {
    log.Printf("Task created: %s", taskInfo.String())
    // Output: Task ID: abc-123, Name: debug:task, Queue: default, 
    //         State: pending, Created: 2025-05-28T10:30:00Z
}

// Server logging tự động theo dõi:
// - Task processing time
// - Worker performance  
// - Retry attempts
// - Failed task reasons
// - Queue sizes

// Có thể tùy chỉnh log level trong config:
// queue:
//   server:
//     log_level: 2  # 0=SILENT, 1=ERROR, 2=INFO, 3=DEBUG
9. Production Best Practices
// Cấu hình production-ready
// config/production.yaml
/*
queue:
  adapter:
    default: "redis"
    redis:
      prefix: "myapp_queue:"
      provider_key: "default"  # Reference Redis provider key
  
  server:
    concurrency: 50              # Điều chỉnh theo CPU cores
    polling_interval: 500        # Giảm cho high-throughput
    strict_priority: true        # Đảm bảo critical tasks được ưu tiên
    queues: ["critical", "high", "default", "low", "bulk"]
    shutdown_timeout: 60         # Đủ thời gian cho graceful shutdown
    retry_limit: 5              # Tăng retry cho production
    
  client:
    default_options:
      queue: "default"
      max_retry: 3
      timeout: 15               # 15 phút timeout mặc định

# Redis cấu hình riêng biệt trong Redis provider
redis:
  default:
    host: "redis-cluster.internal"
    port: 6379
    password: "${REDIS_PASSWORD}"
    db: 0
    cluster:
      enabled: true
      hosts:
        - "redis-1.internal:6379"
        - "redis-2.internal:6379"
        - "redis-3.internal:6379"

scheduler:
  auto_start: true
  distributed_lock:
    enabled: true               # Bắt buộc cho production cluster
  options:
    key_prefix: "myapp_scheduler:"
    lock_duration: 120          # 2 phút
    max_retries: 5
    retry_delay: 500
*/

// Graceful shutdown handling
func setupGracefulShutdown(server queue.Server) {
    c := make(chan os.Signal, 1)
    signal.Notify(c, os.Interrupt, syscall.SIGTERM)
    
    go func() {
        <-c
        log.Println("Shutting down gracefully...")
        
        ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
        defer cancel()
        
        if err := server.Stop(); err != nil {
            log.Printf("Error during shutdown: %v", err)
        }
        
        log.Println("Shutdown complete")
        os.Exit(0)
    }()
}

Documentation

Overview

Package queue cung cấp một service provider để quản lý hàng đợi và xử lý tác vụ bất đồng bộ với thiết kế đơn giản, linh hoạt hỗ trợ cả Redis và bộ nhớ trong. Package đã được nâng cấp để tích hợp hoàn chỉnh với Redis Provider và cung cấp các tính năng Redis nâng cao như priority queues, TTL support, và batch operations.

Package này cung cấp APIs để đưa tác vụ vào hàng đợi, lên lịch thực thi tác vụ và xử lý tác vụ qua các handlers đăng ký. Nó hỗ trợ:

- Thực thi ngay lập tức (immediate execution) - Trì hoãn thực thi theo khoảng thời gian (delayed execution) - Lên lịch thực thi vào thời điểm cụ thể (scheduled execution) - Xử lý tác vụ song song với mức độ song song có thể cấu hình - Hệ thống thử lại tự động với chiến lược backoff - Ưu tiên giữa các hàng đợi khác nhau (queue-level priority) - Priority queues với Redis Sorted Sets (task-level priority) - TTL support cho temporary tasks - Batch operations với Redis pipelines - Queue monitoring và health checks - Hàng đợi trong bộ nhớ cho môi trường phát triển và kiểm thử

Package cung cấp ba thành phần chính:

1. Manager: Quản lý các thành phần trong queue và cung cấp cấu hình 2. Client: Cho phép đưa tác vụ vào hàng đợi (producer) 3. Server: Xử lý các tác vụ từ hàng đợi thông qua handlers (consumer)

Hỗ trợ hai adapter chính:

1. Redis adapter: Cho môi trường production với khả năng mở rộng cao

  • Tích hợp với Redis Provider để centralize configuration
  • Enhanced Redis features: priority queues, TTL, pipelines
  • Advanced monitoring và health checks

2. Memory adapter: Cho môi trường phát triển và kiểm thử

Ví dụ sử dụng với Redis Provider:

// Đăng ký service providers
app.Register(config.NewServiceProvider())
app.Register(redis.NewServiceProvider())  // Required cho Redis adapter
app.Register(scheduler.NewServiceProvider())
app.Register(queue.NewServiceProvider())

// Cấu hình trong config/app.yaml
/*
redis:
  default:
    host: "localhost"
    port: 6379
    db: 0

queue:
  adapter:
    default: "redis"
    redis:
      prefix: "queue:"
      provider_key: "default"  # Reference Redis provider
*/

// Sử dụng client để đưa tác vụ vào hàng đợi
client := app.Make("queue.client").(queue.Client)
client.Enqueue("email:send", map[string]interface{}{
    "to":      "user@example.com",
    "subject": "Hello",
}, queue.WithQueue("emails"), queue.WithMaxRetry(3))

// Sử dụng advanced Redis features
manager := app.Make("queue").(queue.Manager)
if redisAdapter, ok := manager.RedisAdapter().(adapter.QueueRedisAdapter); ok {
    // Priority queue
    redisAdapter.EnqueueWithPriority(ctx, "tasks", &task, 10)

    // TTL support
    redisAdapter.EnqueueWithTTL(ctx, "temporary", &task, 1*time.Hour)

    // Batch operations
    redisAdapter.EnqueueWithPipeline(ctx, "batch", tasks)

    // Monitoring
    info, _ := redisAdapter.GetQueueInfo(ctx, "tasks")
}

// Sử dụng server để xử lý tác vụ
server := app.Make("queue.server").(queue.Server)
server.RegisterHandler("email:send", func(ctx context.Context, task *queue.Task) error {
    var payload map[string]interface{}
    if err := task.Unmarshal(&payload); err != nil {
        return err
    }
    // Xử lý logic gửi email ở đây...
    return nil
})

// Bắt đầu xử lý tác vụ
if err := server.Start(); err != nil {
    log.Fatal(err)
}

Sử dụng bộ nhớ trong (cho phát triển và kiểm thử):

// Khởi tạo client với bộ nhớ trong
client := queue.NewMemoryClient()

// Khởi tạo server với bộ nhớ trong
server := queue.NewMemoryServer(queue.ServerOptions{
    Concurrency: 5,
})

Xử lý tác vụ theo lịch:

// Lên lịch tác vụ sau 5 phút
client.EnqueueIn("report:generate", 5*time.Minute, payload)

// Lên lịch tác vụ vào thời điểm cụ thể
processAt := time.Date(2025, 5, 24, 15, 0, 0, 0, time.Local)
client.EnqueueAt("cleanup:old-data", processAt, payload)

Các tùy chọn khác:

// Tùy chỉnh queue, số lần thử lại, timeout...
client.Enqueue("image:resize", payload,
    queue.WithQueue("media"),
    queue.WithMaxRetry(3),
    queue.WithTimeout(2*time.Minute),
    queue.WithTaskID("resize-123"),
)

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewServiceProvider

func NewServiceProvider() di.ServiceProvider

NewServiceProvider tạo một provider dịch vụ queue mới với cấu hình mặc định.

Sử dụng hàm này để tạo một provider có thể được đăng ký với một instance di.Container.

Trả về:

  • di.ServiceProvider: một service provider cho queue

Ví dụ:

app := myapp.New()
app.Register(queue.NewServiceProvider())

Types

type AdapterConfig

type AdapterConfig struct {
	// Default xác định adapter mặc định sẽ được sử dụng.
	// Các giá trị hợp lệ: "memory", "redis"
	Default string `mapstructure:"default"`

	// Memory chứa cấu hình cho memory adapter.
	Memory MemoryConfig `mapstructure:"memory"`

	// Redis chứa cấu hình cho redis adapter.
	Redis RedisConfig `mapstructure:"redis"`
}

AdapterConfig chứa cấu hình cho các adapter.

type Client

type Client interface {
	// Enqueue đưa một tác vụ vào hàng đợi để xử lý ngay lập tức.
	Enqueue(taskName string, payload interface{}, opts ...Option) (*TaskInfo, error)

	// EnqueueContext tương tự Enqueue nhưng với context.
	EnqueueContext(ctx context.Context, taskName string, payload interface{}, opts ...Option) (*TaskInfo, error)

	// EnqueueIn đưa một tác vụ vào hàng đợi để xử lý sau một khoảng thời gian.
	EnqueueIn(taskName string, delay time.Duration, payload interface{}, opts ...Option) (*TaskInfo, error)

	// EnqueueAt đưa một tác vụ vào hàng đợi để xử lý vào một thời điểm cụ thể.
	EnqueueAt(taskName string, processAt time.Time, payload interface{}, opts ...Option) (*TaskInfo, error)

	// Close đóng kết nối của client.
	Close() error
}

Client là interface cho việc đưa tác vụ vào hàng đợi.

func NewClient

func NewClient(redisClient *redis.Client) Client

NewClient tạo một Client mới với Redis.

func NewClientWithAdapter

func NewClientWithAdapter(adapter adapter.QueueAdapter) Client

NewClientWithAdapter tạo một Client mới với adapter QueueAdapter.

func NewClientWithUniversalClient

func NewClientWithUniversalClient(redisClient redis.UniversalClient) Client

NewClientWithUniversalClient tạo một Client mới với Redis UniversalClient.

func NewMemoryClient

func NewMemoryClient() Client

NewMemoryClient tạo một Client mới với bộ nhớ trong.

type ClientConfig

type ClientConfig struct {
	// DefaultOptions chứa các tùy chọn mặc định cho tác vụ.
	DefaultOptions ClientDefaultOptions `mapstructure:"defaultOptions"`
}

ClientConfig chứa cấu hình cho queue client.

type ClientDefaultOptions

type ClientDefaultOptions struct {
	// Queue là tên queue mặc định cho các tác vụ.
	Queue string `mapstructure:"queue"`

	// MaxRetry là số lần thử lại tối đa cho tác vụ bị lỗi.
	MaxRetry int `mapstructure:"maxRetry"`

	// Timeout là thời gian tối đa để tác vụ hoàn thành (tính bằng phút).
	Timeout int `mapstructure:"timeout"`
}

ClientDefaultOptions chứa các tùy chọn mặc định cho tác vụ.

type Config

type Config struct {
	// Adapter chứa cấu hình cho các queue adapter.
	Adapter AdapterConfig `mapstructure:"adapter"`

	// Server chứa cấu hình cho queue server.
	Server ServerConfig `mapstructure:"server"`

	// Client chứa cấu hình cho queue client.
	Client ClientConfig `mapstructure:"client"`
}

Config chứa cấu hình cho queue package.

func DefaultConfig

func DefaultConfig() Config

DefaultConfig trả về cấu hình mặc định cho queue.

type DeadLetterTask

type DeadLetterTask struct {
	Task     Task      `json:"task"`
	Reason   string    `json:"reason"`
	FailedAt time.Time `json:"failed_at"`
}

DeadLetterTask đại diện cho một task trong dead letter queue

type HandlerFunc

type HandlerFunc func(ctx context.Context, task *Task) error

HandlerFunc là một hàm xử lý tác vụ.

type Manager

type Manager interface {
	// RedisClient trả về Redis client.
	RedisClient() redisClient.UniversalClient

	// MemoryAdapter trả về memory queue adapter.
	MemoryAdapter() adapter.QueueAdapter

	// RedisAdapter trả về redis queue adapter.
	RedisAdapter() adapter.QueueAdapter

	// Adapter trả về queue adapter dựa trên cấu hình.
	Adapter(name string) adapter.QueueAdapter

	// Client trả về Client.
	Client() Client

	// Server trả về Server.
	Server() Server

	// Scheduler trả về Scheduler manager để lên lịch tasks.
	Scheduler() scheduler.Manager

	// SetScheduler thiết lập scheduler manager từ bên ngoài.
	SetScheduler(scheduler scheduler.Manager)
}

Manager định nghĩa interface cho việc quản lý các thành phần queue.

func NewManager

func NewManager(config Config) Manager

NewManager tạo một manager mới với cấu hình mặc định.

func NewManagerWithContainer added in v0.0.5

func NewManagerWithContainer(config Config, container *di.Container) Manager

NewManagerWithContainer tạo một manager mới với container DI để truy cập Redis provider.

type MemoryConfig

type MemoryConfig struct {
	// Prefix là tiền tố cho tên của các queue trong bộ nhớ.
	Prefix string `mapstructure:"prefix"`
}

MemoryConfig chứa cấu hình cho memory adapter.

type Option

type Option func(*TaskOptions)

Option là một hàm để cấu hình tác vụ.

func WithDeadline

func WithDeadline(t time.Time) Option

WithDeadline đặt thời hạn thực hiện cho tác vụ.

func WithDelay

func WithDelay(d time.Duration) Option

WithDelay đặt thời gian trì hoãn cho tác vụ.

func WithMaxRetry

func WithMaxRetry(n int) Option

WithMaxRetry đặt số lần thử lại tối đa cho tác vụ.

func WithProcessAt

func WithProcessAt(t time.Time) Option

WithProcessAt đặt thời điểm xử lý cho tác vụ.

func WithQueue

func WithQueue(queue string) Option

WithQueue đặt tên hàng đợi cho tác vụ.

func WithTaskID

func WithTaskID(id string) Option

WithTaskID đặt ID tùy chỉnh cho tác vụ.

func WithTimeout

func WithTimeout(d time.Duration) Option

WithTimeout đặt thời gian timeout cho tác vụ.

type RedisConfig

type RedisConfig struct {
	// Prefix là tiền tố cho tên của các queue trong Redis.
	Prefix string `mapstructure:"prefix"`

	// ProviderKey là khóa để lấy Redis provider từ DI container.
	// Mặc định là "redis" nếu không được cấu hình.
	ProviderKey string `mapstructure:"provider_key"`
}

RedisConfig chứa cấu hình cho redis adapter.

type Server

type Server interface {
	// RegisterHandler đăng ký một handler cho một loại tác vụ.
	RegisterHandler(taskName string, handler HandlerFunc)

	// RegisterHandlers đăng ký nhiều handler cùng một lúc.
	RegisterHandlers(handlers map[string]HandlerFunc)

	// Start bắt đầu xử lý tác vụ (worker).
	Start() error

	// Stop dừng xử lý tác vụ.
	Stop() error

	// SetScheduler thiết lập scheduler cho server để xử lý delayed tasks.
	SetScheduler(scheduler scheduler.Manager)

	// GetScheduler trả về scheduler hiện tại.
	GetScheduler() scheduler.Manager
}

Server là interface cho việc xử lý tác vụ từ hàng đợi.

func NewServer

func NewServer(redisClient redis.UniversalClient, opts ServerOptions) Server

NewServer tạo một Server mới.

func NewServerWithAdapter

func NewServerWithAdapter(adapter adapter.QueueAdapter, opts ServerOptions) Server

NewServerWithAdapter tạo một Server mới với adapter QueueAdapter được cung cấp.

type ServerConfig

type ServerConfig struct {
	// Concurrency là số lượng worker xử lý tác vụ cùng một lúc.
	Concurrency int `mapstructure:"concurrency"`

	// PollingInterval là khoảng thời gian giữa các lần kiểm tra tác vụ mới (tính bằng mili giây).
	PollingInterval int `mapstructure:"pollingInterval"`

	// DefaultQueue là tên queue mặc định nếu không có queue nào được chỉ định.
	DefaultQueue string `mapstructure:"defaultQueue"`

	// StrictPriority xác định liệu có ưu tiên nghiêm ngặt các queue ưu tiên cao hay không.
	StrictPriority bool `mapstructure:"strictPriority"`

	// Queues là danh sách các queue cần lắng nghe, theo thứ tự ưu tiên.
	Queues []string `mapstructure:"queues"`

	// ShutdownTimeout là thời gian chờ để các worker hoàn tất tác vụ khi dừng server (tính bằng giây).
	ShutdownTimeout int `mapstructure:"shutdownTimeout"`

	// LogLevel xác định mức độ log.
	LogLevel int `mapstructure:"logLevel"`

	// RetryLimit xác định số lần thử lại tối đa cho tác vụ bị lỗi.
	RetryLimit int `mapstructure:"retryLimit"`
}

ServerConfig chứa cấu hình cho queue server.

type ServerOptions

type ServerOptions struct {
	// Concurrency xác định số lượng worker xử lý tác vụ song song.
	Concurrency int

	// PollingInterval xác định thời gian chờ giữa các lần kiểm tra tác vụ (tính bằng mili giây).
	PollingInterval int

	// DefaultQueue xác định tên queue mặc định nếu không có queue nào được chỉ định.
	DefaultQueue string

	// StrictPriority xác định liệu có nên ưu tiên nghiêm ngặt giữa các hàng đợi.
	StrictPriority bool

	// Queues xác định danh sách các queue cần lắng nghe theo thứ tự ưu tiên.
	Queues []string

	// ShutdownTimeout xác định thời gian chờ để các worker hoàn tất tác vụ khi dừng server.
	ShutdownTimeout time.Duration

	// LogLevel xác định mức log.
	LogLevel int

	// RetryLimit xác định số lần thử lại tối đa cho tác vụ bị lỗi.
	RetryLimit int
}

ServerOptions chứa các tùy chọn cấu hình cho server.

type ServiceProvider

type ServiceProvider interface {
	di.ServiceProvider
	// contains filtered or unexported methods
}

type Task

type Task struct {
	// ID là định danh duy nhất của tác vụ
	ID string

	// Name là tên của loại tác vụ
	Name string

	// Payload là dữ liệu của tác vụ dưới dạng bytes
	Payload []byte

	// Queue là tên của hàng đợi chứa tác vụ
	Queue string

	// MaxRetry là số lần thử lại tối đa nếu tác vụ thất bại
	MaxRetry int

	// RetryCount là số lần tác vụ đã được thử lại
	RetryCount int

	// CreatedAt là thời điểm tác vụ được tạo
	CreatedAt time.Time

	// ProcessAt là thời điểm tác vụ sẽ được xử lý
	ProcessAt time.Time
}

Task đại diện cho một tác vụ cần được xử lý.

func NewTask

func NewTask(name string, payload []byte) *Task

NewTask tạo một tác vụ mới với tên và payload được cung cấp.

func (*Task) Unmarshal

func (t *Task) Unmarshal(v interface{}) error

Unmarshal giải mã payload thành một struct.

type TaskInfo

type TaskInfo struct {
	// ID là định danh duy nhất của tác vụ
	ID string

	// Name là tên của loại tác vụ
	Name string

	// Queue là tên của hàng đợi chứa tác vụ
	Queue string

	// MaxRetry là số lần thử lại tối đa nếu tác vụ thất bại
	MaxRetry int

	// State là trạng thái hiện tại của tác vụ (ví dụ: "pending", "scheduled", "processing", "completed")
	State string

	// CreatedAt là thời điểm tác vụ được tạo
	CreatedAt time.Time

	// ProcessAt là thời điểm tác vụ sẽ được xử lý
	ProcessAt time.Time
}

TaskInfo chứa thông tin về một tác vụ đã được đưa vào hàng đợi.

func (*TaskInfo) String

func (info *TaskInfo) String() string

String trả về biểu diễn chuỗi của TaskInfo.

type TaskOptions

type TaskOptions struct {
	// Queue là tên hàng đợi cho tác vụ
	Queue string

	// MaxRetry là số lần thử lại tối đa nếu tác vụ thất bại
	MaxRetry int

	// Timeout là thời gian tối đa để tác vụ hoàn thành
	Timeout time.Duration

	// Deadline là thời hạn chót để tác vụ hoàn thành
	Deadline time.Time

	// Delay là thời gian trì hoãn trước khi xử lý tác vụ
	Delay time.Duration

	// ProcessAt là thời điểm cụ thể để xử lý tác vụ
	ProcessAt time.Time

	// TaskID là ID tùy chỉnh cho tác vụ
	TaskID string
}

TaskOptions chứa các tùy chọn khi đưa tác vụ vào hàng đợi.

func ApplyOptions

func ApplyOptions(opts ...Option) *TaskOptions

ApplyOptions áp dụng các tùy chọn vào TaskOptions.

func GetDefaultOptions

func GetDefaultOptions() *TaskOptions

GetDefaultOptions trả về các tùy chọn mặc định.

Directories

Path Synopsis
Package adapter cung cấp các triển khai khác nhau cho queue backend.
Package adapter cung cấp các triển khai khác nhau cho queue backend.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL