Go Routine Queue
一个基于Golang和PostgreSQL的高性能分布式队列系统,每个worker是一个独立的goroutine,支持动态扩缩容和任务处理。
特性
- 基于PostgreSQL的分布式协调,确保任务不会重复处理
- 根据队列名称动态调整worker数量,实现资源的高效利用
- 自动清理已完成任务,避免数据库膨胀
- 提供完整的RESTful监控API接口,方便系统状态查询
- 支持自定义任务处理器,灵活适应不同业务场景
- 优雅的启动和关闭机制,确保任务不会丢失
- 高效的任务分发和处理,最大化系统吞吐量
架构
系统由以下核心组件组成:
- 队列管理器(QueueManager): 负责队列的创建、删除和worker数量的动态调整,同时管理任务处理器的注册
- 工作器池(WorkerPool): 管理goroutine工作器的生命周期,包括启动、停止和任务处理
- 数据库连接层(DBConnector): 处理与PostgreSQL的交互,包括任务的入队、出队和状态更新
- 任务处理器(TaskProcessor): 执行具体的任务逻辑,支持自定义实现
- 监控接口(Monitor): 提供RESTful API,用于查询系统状态和管理队列
安装
前置条件
安装步骤
- 克隆仓库
git clone https://github.com/Aliciahan/go-routine-queue.git
cd go-routine-queue
- 安装依赖
go mod download
- 编译
go build -o queue-server ./cmd/server
使用方法
启动服务器
# 设置环境变量
export DB_CONNECTION_STRING="postgres://postgres:postgres@localhost:5432/queue_db?sslmode=disable"
export DEFAULT_WORKER_COUNT=5
export CLEANUP_INTERVAL=3600
export MONITOR_ADDR=":8080"
# 启动服务器
./queue-server
客户端示例
package main
import (
"encoding/json"
"fmt"
"log"
"github.com/Aliciahan/go-routine-queue/pkg/queue"
)
func main() {
// 连接数据库
dbConnStr := "postgres://postgres:postgres@localhost:5432/queue_db?sslmode=disable"
db, err := queue.NewDBConnector(dbConnStr)
if err != nil {
log.Fatalf("Failed to connect to database: %v", err)
}
defer db.Close()
// 创建任务负载
payload := map[string]interface{}{
"id": "task-1",
"name": "Example Task",
"data": "This is task data",
}
// 序列化任务负载
payloadBytes, err := json.Marshal(payload)
if err != nil {
log.Fatalf("Failed to marshal task payload: %v", err)
}
// 将任务加入队列
taskID, err := db.EnqueueTask("default", payloadBytes)
if err != nil {
log.Fatalf("Failed to enqueue task: %v", err)
}
fmt.Printf("Task enqueued with ID: %d\n", taskID)
}
自定义任务处理器
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"github.com/Aliciahan/go-routine-queue/pkg/queue"
)
// 自定义任务处理器
type CustomTaskProcessor struct{}
// Process 实现TaskProcessor接口
func (p *CustomTaskProcessor) Process(ctx context.Context, task *queue.Task) error {
// 解析任务负载
var payload map[string]interface{}
if err := json.Unmarshal(task.Payload, &payload); err != nil {
return err
}
// 处理任务
fmt.Printf("Processing task: %v\n", payload)
// 返回nil表示处理成功
return nil
}
func main() {
// 连接数据库
dbConnStr := "postgres://postgres:postgres@localhost:5432/queue_db?sslmode=disable"
db, err := queue.NewDBConnector(dbConnStr)
if err != nil {
log.Fatalf("Failed to connect to database: %v", err)
}
defer db.Close()
// 创建队列管理器
qm := queue.NewQueueManager(db)
// 注册自定义任务处理器
qm.RegisterTaskProcessor("default", &CustomTaskProcessor{})
// 启动队列管理器
if err := qm.Start(); err != nil {
log.Fatalf("Failed to start queue manager: %v", err)
}
defer qm.Stop()
// 创建队列
if err := qm.CreateQueue("default", 5); err != nil {
log.Printf("Queue already exists: %v", err)
}
// 应用程序逻辑...
}
API文档
监控API
端点 |
方法 |
描述 |
/api/queues |
GET |
获取所有队列状态 |
/api/queues/{queue_name} |
GET |
获取指定队列状态 |
/api/queues/{queue_name} |
POST |
创建新队列 |
/api/queues/{queue_name} |
PUT |
更新队列配置 |
/api/queues/{queue_name} |
DELETE |
删除队列 |
/api/stats |
GET |
获取系统统计信息 |
/health |
GET |
健康检查 |
环境变量
变量名 |
描述 |
默认值 |
DB_CONNECTION_STRING |
PostgreSQL连接字符串 |
postgres://postgres:postgres@localhost:5432/queue_db?sslmode=disable |
DEFAULT_WORKER_COUNT |
默认worker数量 |
5 |
CLEANUP_INTERVAL |
清理间隔时间(秒) |
3600 |
MONITOR_ADDR |
监控服务器地址 |
:8080 |
性能优化
- 使用连接池管理数据库连接
- 批量处理任务以减少数据库操作
- 定期清理已完成任务以优化数据库性能
- 使用原子操作和互斥锁确保并发安全
贡献
欢迎提交问题和拉取请求!
许可证
Apache License 2.0