Documentation
¶
Overview ¶
Package queue cung cấp một service provider để quản lý hàng đợi và xử lý tác vụ bất đồng bộ với thiết kế đơn giản, linh hoạt hỗ trợ cả Redis và bộ nhớ trong. Package đã được nâng cấp để tích hợp hoàn chỉnh với Redis Provider và cung cấp các tính năng Redis nâng cao như priority queues, TTL support, và batch operations.
Package này cung cấp APIs để đưa tác vụ vào hàng đợi, lên lịch thực thi tác vụ và xử lý tác vụ qua các handlers đăng ký. Nó hỗ trợ:
- Thực thi ngay lập tức (immediate execution) - Trì hoãn thực thi theo khoảng thời gian (delayed execution) - Lên lịch thực thi vào thời điểm cụ thể (scheduled execution) - Xử lý tác vụ song song với mức độ song song có thể cấu hình - Hệ thống thử lại tự động với chiến lược backoff - Ưu tiên giữa các hàng đợi khác nhau (queue-level priority) - Priority queues với Redis Sorted Sets (task-level priority) - TTL support cho temporary tasks - Batch operations với Redis pipelines - Queue monitoring và health checks - Hàng đợi trong bộ nhớ cho môi trường phát triển và kiểm thử
Package cung cấp ba thành phần chính:
1. Manager: Quản lý các thành phần trong queue và cung cấp cấu hình 2. Client: Cho phép đưa tác vụ vào hàng đợi (producer) 3. Server: Xử lý các tác vụ từ hàng đợi thông qua handlers (consumer)
Hỗ trợ hai adapter chính:
1. Redis adapter: Cho môi trường production với khả năng mở rộng cao
- Tích hợp với Redis Provider để centralize configuration
- Enhanced Redis features: priority queues, TTL, pipelines
- Advanced monitoring và health checks
2. Memory adapter: Cho môi trường phát triển và kiểm thử
Ví dụ sử dụng với Redis Provider:
// Đăng ký service providers app.Register(config.NewServiceProvider()) app.Register(redis.NewServiceProvider()) // Required cho Redis adapter app.Register(scheduler.NewServiceProvider()) app.Register(queue.NewServiceProvider()) // Cấu hình trong config/app.yaml /* redis: default: host: "localhost" port: 6379 db: 0 queue: adapter: default: "redis" redis: prefix: "queue:" provider_key: "default" # Reference Redis provider */ // Sử dụng client để đưa tác vụ vào hàng đợi client := app.Make("queue.client").(queue.Client) client.Enqueue("email:send", map[string]interface{}{ "to": "user@example.com", "subject": "Hello", }, queue.WithQueue("emails"), queue.WithMaxRetry(3)) // Sử dụng advanced Redis features manager := app.Make("queue").(queue.Manager) if redisAdapter, ok := manager.RedisAdapter().(adapter.QueueRedisAdapter); ok { // Priority queue redisAdapter.EnqueueWithPriority(ctx, "tasks", &task, 10) // TTL support redisAdapter.EnqueueWithTTL(ctx, "temporary", &task, 1*time.Hour) // Batch operations redisAdapter.EnqueueWithPipeline(ctx, "batch", tasks) // Monitoring info, _ := redisAdapter.GetQueueInfo(ctx, "tasks") } // Sử dụng server để xử lý tác vụ server := app.Make("queue.server").(queue.Server) server.RegisterHandler("email:send", func(ctx context.Context, task *queue.Task) error { var payload map[string]interface{} if err := task.Unmarshal(&payload); err != nil { return err } // Xử lý logic gửi email ở đây... return nil }) // Bắt đầu xử lý tác vụ if err := server.Start(); err != nil { log.Fatal(err) }
Sử dụng bộ nhớ trong (cho phát triển và kiểm thử):
// Khởi tạo client với bộ nhớ trong client := queue.NewMemoryClient() // Khởi tạo server với bộ nhớ trong server := queue.NewMemoryServer(queue.ServerOptions{ Concurrency: 5, })
Xử lý tác vụ theo lịch:
// Lên lịch tác vụ sau 5 phút client.EnqueueIn("report:generate", 5*time.Minute, payload) // Lên lịch tác vụ vào thời điểm cụ thể processAt := time.Date(2025, 5, 24, 15, 0, 0, 0, time.Local) client.EnqueueAt("cleanup:old-data", processAt, payload)
Các tùy chọn khác:
// Tùy chỉnh queue, số lần thử lại, timeout... client.Enqueue("image:resize", payload, queue.WithQueue("media"), queue.WithMaxRetry(3), queue.WithTimeout(2*time.Minute), queue.WithTaskID("resize-123"), )
Index ¶
- func NewServiceProvider() di.ServiceProvider
- type AdapterConfig
- type Client
- type ClientConfig
- type ClientDefaultOptions
- type Config
- type DeadLetterTask
- type HandlerFunc
- type Manager
- type MemoryConfig
- type Option
- type RedisConfig
- type Server
- type ServerConfig
- type ServerOptions
- type ServiceProvider
- type Task
- type TaskInfo
- type TaskOptions
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewServiceProvider ¶
func NewServiceProvider() di.ServiceProvider
NewServiceProvider tạo một provider dịch vụ queue mới với cấu hình mặc định.
Sử dụng hàm này để tạo một provider có thể được đăng ký với một instance di.Container.
Trả về:
- di.ServiceProvider: một service provider cho queue
Ví dụ:
app := myapp.New() app.Register(queue.NewServiceProvider())
Types ¶
type AdapterConfig ¶
type AdapterConfig struct { // Default xác định adapter mặc định sẽ được sử dụng. // Các giá trị hợp lệ: "memory", "redis" Default string `mapstructure:"default"` // Memory chứa cấu hình cho memory adapter. Memory MemoryConfig `mapstructure:"memory"` // Redis chứa cấu hình cho redis adapter. Redis RedisConfig `mapstructure:"redis"` }
AdapterConfig chứa cấu hình cho các adapter.
type Client ¶
type Client interface { // Enqueue đưa một tác vụ vào hàng đợi để xử lý ngay lập tức. Enqueue(taskName string, payload interface{}, opts ...Option) (*TaskInfo, error) // EnqueueContext tương tự Enqueue nhưng với context. EnqueueContext(ctx context.Context, taskName string, payload interface{}, opts ...Option) (*TaskInfo, error) // EnqueueIn đưa một tác vụ vào hàng đợi để xử lý sau một khoảng thời gian. EnqueueIn(taskName string, delay time.Duration, payload interface{}, opts ...Option) (*TaskInfo, error) // EnqueueAt đưa một tác vụ vào hàng đợi để xử lý vào một thời điểm cụ thể. EnqueueAt(taskName string, processAt time.Time, payload interface{}, opts ...Option) (*TaskInfo, error) // Close đóng kết nối của client. Close() error }
Client là interface cho việc đưa tác vụ vào hàng đợi.
func NewClient ¶
func NewClient(redisClient *redis.Client) Client
NewClient tạo một Client mới với Redis.
func NewClientWithAdapter ¶
func NewClientWithAdapter(adapter adapter.QueueAdapter) Client
NewClientWithAdapter tạo một Client mới với adapter QueueAdapter.
func NewClientWithUniversalClient ¶
func NewClientWithUniversalClient(redisClient redis.UniversalClient) Client
NewClientWithUniversalClient tạo một Client mới với Redis UniversalClient.
func NewMemoryClient ¶
func NewMemoryClient() Client
NewMemoryClient tạo một Client mới với bộ nhớ trong.
type ClientConfig ¶
type ClientConfig struct { // DefaultOptions chứa các tùy chọn mặc định cho tác vụ. DefaultOptions ClientDefaultOptions `mapstructure:"defaultOptions"` }
ClientConfig chứa cấu hình cho queue client.
type ClientDefaultOptions ¶
type ClientDefaultOptions struct { // Queue là tên queue mặc định cho các tác vụ. Queue string `mapstructure:"queue"` // MaxRetry là số lần thử lại tối đa cho tác vụ bị lỗi. MaxRetry int `mapstructure:"maxRetry"` // Timeout là thời gian tối đa để tác vụ hoàn thành (tính bằng phút). Timeout int `mapstructure:"timeout"` }
ClientDefaultOptions chứa các tùy chọn mặc định cho tác vụ.
type Config ¶
type Config struct { // Adapter chứa cấu hình cho các queue adapter. Adapter AdapterConfig `mapstructure:"adapter"` // Server chứa cấu hình cho queue server. Server ServerConfig `mapstructure:"server"` // Client chứa cấu hình cho queue client. Client ClientConfig `mapstructure:"client"` }
Config chứa cấu hình cho queue package.
type DeadLetterTask ¶
type DeadLetterTask struct { Task Task `json:"task"` Reason string `json:"reason"` FailedAt time.Time `json:"failed_at"` }
DeadLetterTask đại diện cho một task trong dead letter queue
type HandlerFunc ¶
HandlerFunc là một hàm xử lý tác vụ.
type Manager ¶
type Manager interface { // RedisClient trả về Redis client. RedisClient() redisClient.UniversalClient // MemoryAdapter trả về memory queue adapter. MemoryAdapter() adapter.QueueAdapter // RedisAdapter trả về redis queue adapter. RedisAdapter() adapter.QueueAdapter // Adapter trả về queue adapter dựa trên cấu hình. Adapter(name string) adapter.QueueAdapter // Client trả về Client. Client() Client // Server trả về Server. Server() Server // Scheduler trả về Scheduler manager để lên lịch tasks. Scheduler() scheduler.Manager // SetScheduler thiết lập scheduler manager từ bên ngoài. SetScheduler(scheduler scheduler.Manager) }
Manager định nghĩa interface cho việc quản lý các thành phần queue.
func NewManager ¶
NewManager tạo một manager mới với cấu hình mặc định.
type MemoryConfig ¶
type MemoryConfig struct { // Prefix là tiền tố cho tên của các queue trong bộ nhớ. Prefix string `mapstructure:"prefix"` }
MemoryConfig chứa cấu hình cho memory adapter.
type Option ¶
type Option func(*TaskOptions)
Option là một hàm để cấu hình tác vụ.
func WithDeadline ¶
WithDeadline đặt thời hạn thực hiện cho tác vụ.
func WithMaxRetry ¶
WithMaxRetry đặt số lần thử lại tối đa cho tác vụ.
func WithProcessAt ¶
WithProcessAt đặt thời điểm xử lý cho tác vụ.
func WithTimeout ¶
WithTimeout đặt thời gian timeout cho tác vụ.
type RedisConfig ¶
type RedisConfig struct { // Prefix là tiền tố cho tên của các queue trong Redis. Prefix string `mapstructure:"prefix"` // ProviderKey là khóa để lấy Redis provider từ DI container. // Mặc định là "redis" nếu không được cấu hình. ProviderKey string `mapstructure:"provider_key"` }
RedisConfig chứa cấu hình cho redis adapter.
type Server ¶
type Server interface { // RegisterHandler đăng ký một handler cho một loại tác vụ. RegisterHandler(taskName string, handler HandlerFunc) // RegisterHandlers đăng ký nhiều handler cùng một lúc. RegisterHandlers(handlers map[string]HandlerFunc) // Start bắt đầu xử lý tác vụ (worker). Start() error // Stop dừng xử lý tác vụ. Stop() error // SetScheduler thiết lập scheduler cho server để xử lý delayed tasks. SetScheduler(scheduler scheduler.Manager) // GetScheduler trả về scheduler hiện tại. GetScheduler() scheduler.Manager }
Server là interface cho việc xử lý tác vụ từ hàng đợi.
func NewServer ¶
func NewServer(redisClient redis.UniversalClient, opts ServerOptions) Server
NewServer tạo một Server mới.
func NewServerWithAdapter ¶
func NewServerWithAdapter(adapter adapter.QueueAdapter, opts ServerOptions) Server
NewServerWithAdapter tạo một Server mới với adapter QueueAdapter được cung cấp.
type ServerConfig ¶
type ServerConfig struct { // Concurrency là số lượng worker xử lý tác vụ cùng một lúc. Concurrency int `mapstructure:"concurrency"` // PollingInterval là khoảng thời gian giữa các lần kiểm tra tác vụ mới (tính bằng mili giây). PollingInterval int `mapstructure:"pollingInterval"` // DefaultQueue là tên queue mặc định nếu không có queue nào được chỉ định. DefaultQueue string `mapstructure:"defaultQueue"` // StrictPriority xác định liệu có ưu tiên nghiêm ngặt các queue ưu tiên cao hay không. StrictPriority bool `mapstructure:"strictPriority"` // Queues là danh sách các queue cần lắng nghe, theo thứ tự ưu tiên. Queues []string `mapstructure:"queues"` // ShutdownTimeout là thời gian chờ để các worker hoàn tất tác vụ khi dừng server (tính bằng giây). ShutdownTimeout int `mapstructure:"shutdownTimeout"` // LogLevel xác định mức độ log. LogLevel int `mapstructure:"logLevel"` // RetryLimit xác định số lần thử lại tối đa cho tác vụ bị lỗi. RetryLimit int `mapstructure:"retryLimit"` }
ServerConfig chứa cấu hình cho queue server.
type ServerOptions ¶
type ServerOptions struct { // Concurrency xác định số lượng worker xử lý tác vụ song song. Concurrency int // PollingInterval xác định thời gian chờ giữa các lần kiểm tra tác vụ (tính bằng mili giây). PollingInterval int // DefaultQueue xác định tên queue mặc định nếu không có queue nào được chỉ định. DefaultQueue string // StrictPriority xác định liệu có nên ưu tiên nghiêm ngặt giữa các hàng đợi. StrictPriority bool // Queues xác định danh sách các queue cần lắng nghe theo thứ tự ưu tiên. Queues []string // ShutdownTimeout xác định thời gian chờ để các worker hoàn tất tác vụ khi dừng server. ShutdownTimeout time.Duration // LogLevel xác định mức log. LogLevel int // RetryLimit xác định số lần thử lại tối đa cho tác vụ bị lỗi. RetryLimit int }
ServerOptions chứa các tùy chọn cấu hình cho server.
type ServiceProvider ¶
type ServiceProvider interface { di.ServiceProvider // contains filtered or unexported methods }
type Task ¶
type Task struct { // ID là định danh duy nhất của tác vụ ID string // Name là tên của loại tác vụ Name string // Payload là dữ liệu của tác vụ dưới dạng bytes Payload []byte // Queue là tên của hàng đợi chứa tác vụ Queue string // MaxRetry là số lần thử lại tối đa nếu tác vụ thất bại MaxRetry int // RetryCount là số lần tác vụ đã được thử lại RetryCount int // CreatedAt là thời điểm tác vụ được tạo CreatedAt time.Time // ProcessAt là thời điểm tác vụ sẽ được xử lý ProcessAt time.Time }
Task đại diện cho một tác vụ cần được xử lý.
type TaskInfo ¶
type TaskInfo struct { // ID là định danh duy nhất của tác vụ ID string // Name là tên của loại tác vụ Name string // Queue là tên của hàng đợi chứa tác vụ Queue string // MaxRetry là số lần thử lại tối đa nếu tác vụ thất bại MaxRetry int // State là trạng thái hiện tại của tác vụ (ví dụ: "pending", "scheduled", "processing", "completed") State string // CreatedAt là thời điểm tác vụ được tạo CreatedAt time.Time // ProcessAt là thời điểm tác vụ sẽ được xử lý ProcessAt time.Time }
TaskInfo chứa thông tin về một tác vụ đã được đưa vào hàng đợi.
type TaskOptions ¶
type TaskOptions struct { // Queue là tên hàng đợi cho tác vụ Queue string // MaxRetry là số lần thử lại tối đa nếu tác vụ thất bại MaxRetry int // Timeout là thời gian tối đa để tác vụ hoàn thành Timeout time.Duration // Deadline là thời hạn chót để tác vụ hoàn thành Deadline time.Time // Delay là thời gian trì hoãn trước khi xử lý tác vụ Delay time.Duration // ProcessAt là thời điểm cụ thể để xử lý tác vụ ProcessAt time.Time // TaskID là ID tùy chỉnh cho tác vụ TaskID string }
TaskOptions chứa các tùy chọn khi đưa tác vụ vào hàng đợi.
func ApplyOptions ¶
func ApplyOptions(opts ...Option) *TaskOptions
ApplyOptions áp dụng các tùy chọn vào TaskOptions.
func GetDefaultOptions ¶
func GetDefaultOptions() *TaskOptions
GetDefaultOptions trả về các tùy chọn mặc định.