taskbus

package module
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 1, 2025 License: MIT Imports: 15 Imported by: 0

README

TaskBus

高性能、生产可用的 Go 任务队列与事件总线库,支持 RabbitMQ/Redis、多中间件、延迟消息、幂等、重试/死信,以及本地/分布式 Cron 调度。

特性

  • MQ 适配:RabbitMQ、Redis(即时/延迟消息)
  • Jobs:重试、指数回退、死信处理
  • EventBus:发布/订阅、类型过滤
  • Cron:本地/分布式执行(基于 MQ Leader 选举)
  • 幂等:内置 Redis 幂等中间件
  • 生产就绪:日志接口、错误处理、连接恢复

安装

go get github.com/northseadl/taskbus

快速开始

package main

import (
    "context"
    "log"

    "github.com/northseadl/taskbus"
)

func main() {
    cfg := taskbus.Config{
        MQ: taskbus.MQConfig{
            Provider: taskbus.MQProviderRabbitMQ,
            RabbitMQ: taskbus.RabbitMQConfig{
                URI:      "amqp://localhost:5672",
                Exchange: "app.events",
            },
        },
    }

    ctx := context.Background()
    c, err := taskbus.New(ctx, cfg)
    if err != nil { log.Fatal(err) }
    defer c.Close(ctx)

    // 发布事件
    _ = c.Bus().Publish(ctx, taskbus.Event{
        Topic:   "user.created",
        Type:    "UserCreated",
        Subject: "uid-1",
        Payload: []byte("{}"),
    })
}

测试

集成测试矩阵

完整的端到端集成测试覆盖以下场景:

测试场景 文件 验证内容
基础消息流 integration_test.go RabbitMQ 发布/消费、Jobs 重试/死信
事件过滤 eventbus_filter_test.go FilterByType 多事件类型筛选
Redis 延时 redis_delay_test.go PublishDelay → Stream 搬运 → 延时消费
分布式 Cron cron_distributed_test.go Leader 选举与任务调度
幂等性 idempotency_test.go Jobs/EventBus Redis KV 幂等验证
并发消费 rabbitmq_concurrency_test.go ConsumerConcurrency 参数验证
连接恢复 resilience_test.go 重连后消息不丢失
运行方式

本地 Docker 环境

cd pkg/taskbus
chmod +x test/run_integration.sh
./test/run_integration.sh

手动运行(需先启动 RabbitMQ/Redis):

export TQ_RABBITMQ_URI=amqp://admin:admin123@localhost:5673/
export TQ_RABBITMQ_EXCHANGE=taskbus.events
export TQ_RABBITMQ_DELAYED_EXCHANGE=taskbus.events.delayed
export TQ_REDIS_ADDR=localhost:6380

cd pkg/taskbus
go test ./test/integration -v

环境变量门禁:未设置环境变量时测试自动跳过,适合 CI/CD 流水线。

GitHub Actions CI

项目配置了自动化 CI,在每次推送和 PR 时运行完整的集成测试矩阵。

许可

MIT License

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Client

type Client interface {
	// Start 启动后台协程等资源(如延时调度器)。
	Start(ctx context.Context) error
	// Close 优雅关闭,等待在途任务,遵循 ctx 超时。
	Close(ctx context.Context) error

	// MQ 暴露统一的消息队列抽象。
	MQ() MQ
	// Jobs 暴露任务队列(基于 MQ)。
	Jobs() Jobs
	// Cron 暴露 Cron 调度。
	Cron() Cron
	// Bus 暴露事件总线。
	Bus() EventBus
}

func New

func New(ctx context.Context, cfg Config, opts ...Option) (Client, error)

New 创建 Client 实例。

type Config

type Config struct {
	MQ       MQConfig
	Job      JobConfig
	Cron     CronConfig
	EventBus EventBusConfig
	Logger   LoggerConfig

	// Idempotency 可选配置:若提供 KV/Redis,则默认启用 Jobs 与 EventBus 的幂等检查。
	Idempotency IdempotencyConfig
}

Config 为包总配置,应用通过 New 传入。

type Consumer

type Consumer interface {
	// Consume 订阅 topic,group 为消费组;返回停止函数。
	Consume(ctx context.Context, topic, group string, handler Handler, mws ...Middleware) (stop func(context.Context) error, err error)
}

Consumer 统一消费接口。

type Cron

type Cron interface {
	Add(spec string, name string, fn func(context.Context) error, mws ...CronMiddleware) (id string, err error)
	Remove(id string) error
	Start(ctx context.Context) error
	Stop(ctx context.Context) error
}

Cron 提供基于 Cron 表达式的任务调度管理。

type CronConfig

type CronConfig struct {
	Timezone string
	// Distributed 开启分布式调度(Scheduler + Executor)。
	Distributed bool
	// LeaderLockKey 分布式锁键(Redis)。
	LeaderLockKey string
	// LeaderTTL 锁过期时间。
	LeaderTTL time.Duration
	// ExecutorGroup 执行器消费组。
	ExecutorGroup string
}

type CronMiddleware

type CronMiddleware func(next func(context.Context) error) func(context.Context) error

CronMiddleware 用于 Cron 任务。

type DelayMode

type DelayMode string

DelayMode 用于 RabbitMQ 延时消息兼容模式。

const (
	DelayModeStandard DelayMode = "standard" // 使用 x-delayed-message 插件(x-delay)
	DelayModeAliyun   DelayMode = "aliyun"   // 使用阿里云原生(delay)
)

type EnqueueOption

type EnqueueOption func(*enqueueOpts)

EnqueueOption 入队选项。

func WithDelay

func WithDelay(d time.Duration) EnqueueOption

WithDelay 指定延时。

func WithKey

func WithKey(k string) EnqueueOption

WithKey 指定幂等键或业务键。

type Event

type Event struct {
	Topic    string
	Type     string
	Subject  string
	Metadata map[string]string
	Payload  []byte
}

Event 领域事件。

type EventBus

type EventBus interface {
	Publish(ctx context.Context, e Event) error
	Subscribe(topic, group string, filter Filter, handler func(context.Context, Event) error, mws ...EventMiddleware) (stop func(context.Context) error, err error)
}

EventBus 提供事件发布与订阅。

type EventBusConfig

type EventBusConfig struct {
	SubscriberConcurrency int
}

type EventMiddleware

type EventMiddleware func(next func(context.Context, Event) error) func(context.Context, Event) error

EventMiddleware 用于 EventBus。

type ExponentialBackoff

type ExponentialBackoff struct {
	Base       time.Duration
	Factor     float64
	MaxRetries int
}

ExponentialBackoff 简单指数回退策略。

func (ExponentialBackoff) NextBackoff

func (e ExponentialBackoff) NextBackoff(attempt int) (time.Duration, bool)

type Filter

type Filter func(e Event) bool

func FilterByType

func FilterByType(t string) Filter

简单过滤器辅助

type Handler

type Handler func(ctx context.Context, msg Message) error

Handler 处理 MQ 消息。

type IdempotencyConfig

type IdempotencyConfig struct {
	KV KV // 可选:键值存储(生产用 RedisKV),若为 nil 且提供 Redis* 或使用 MQ.Redis,则自动创建
	// 可选 Redis 连接参数(若 KV 为空则使用这些参数自动启用)
	RedisAddr     string
	RedisUsername string
	RedisPassword string
	RedisDB       int

	Prefix  string                                               // key 前缀,如 "tq:idem"
	TTL     time.Duration                                        // 幂等键过期时间
	KeyFunc func(ctx context.Context, m Message) (string, error) // 可选:自定义业务唯一键
}

IdempotencyConfig 配置幂等中间件。 Key 计算顺序:优先 Message.Key;若为空且提供 KeyFunc,则使用 KeyFunc。 最终存储 key 为 Prefix + ":" + sha1(keyRaw)。

type Job

type Job interface {
	Name() string
	Execute(ctx context.Context, payload []byte) error
}

Job 定义业务任务。

type JobConfig

type JobConfig struct {
	Retry           RetryConfig
	DeadLetterTopic string
}

type JobHandler

type JobHandler func(ctx context.Context, jobName string, payload []byte) error

JobHandler 用于中间件包装 Job 执行。

type JobMiddleware

type JobMiddleware func(next JobHandler) JobHandler

JobMiddleware 用于 Job 执行。

func NewJobIdempotencyMiddleware

func NewJobIdempotencyMiddleware(cfg IdempotencyConfig) JobMiddleware

NewJobIdempotencyMiddleware 生成 JobMiddleware。 Job 场景下,构造 Message 计算 key(jobName+payload 或自定义)。

type Jobs

type Jobs interface {
	Register(job Job)
	Enqueue(ctx context.Context, jobName string, payload []byte, opts ...EnqueueOption) error
	StartWorkers(ctx context.Context, groups map[string]int, mws ...JobMiddleware) (stop func(context.Context) error, err error)
}

Jobs 提供任务入队与 Worker 管理。

type KV

type KV interface {
	SetNX(ctx context.Context, key string, value string, ttl time.Duration) (bool, error)
}

KV 是幂等中间件依赖的最小键值接口,便于单元测试注入 mock。

type Logger

type Logger interface {
	Info(ctx context.Context, msg string, kv ...interface{})
	Error(ctx context.Context, msg string, kv ...interface{})
}

Logger 为最小日志接口,应用可注入自定义实现。

type LoggerConfig

type LoggerConfig struct {
	Level string
}

type MQ

type MQ interface {
	Producer
	Consumer
	Close(ctx context.Context) error
}

MQ 聚合 Producer 与 Consumer,并暴露 Close 以释放资源。

type MQConfig

type MQConfig struct {
	Provider MQProvider
	RabbitMQ RabbitMQConfig
	Redis    RedisConfig
}

type MQProvider

type MQProvider string
const (
	MQProviderRabbitMQ MQProvider = "rabbitmq"
	MQProviderRedis    MQProvider = "redis"
)

type Message

type Message struct {
	Topic   string
	Key     string
	Body    []byte
	Headers map[string]string
}

Message 为统一消息结构。

type Middleware

type Middleware func(next Handler) Handler

Middleware 用于 MQ Handler。

func NewIdempotencyMiddleware

func NewIdempotencyMiddleware(cfg IdempotencyConfig) Middleware

NewIdempotencyMiddleware 生成通用 MQ Middleware。

type Option

type Option func(*client)

Option 允许注入替换默认行为(如 Logger)。

func WithLogger

func WithLogger(l Logger) Option

WithLogger 注入自定义日志实现。

type Producer

type Producer interface {
	Publish(ctx context.Context, msg Message) error
	PublishDelay(ctx context.Context, msg Message, delay time.Duration) error
}

Producer 统一发布接口。

type RabbitMQConfig

type RabbitMQConfig struct {
	URI                 string
	Exchange            string
	DelayedExchange     string
	Prefetch            int
	ConsumerConcurrency int
	// DelayMode 选择延时消息兼容模式;默认 standard。
	DelayMode DelayMode
}

type RedisConfig

type RedisConfig struct {
	Addr                string
	Username            string
	Password            string
	DB                  int
	ConsumerConcurrency int
}

type RedisKV

type RedisKV struct{ R *redis.Client }

RedisKV 适配 github.com/redis/go-redis 以满足 KV 接口。

func (RedisKV) SetNX

func (r RedisKV) SetNX(ctx context.Context, key string, value string, ttl time.Duration) (bool, error)

type RetryConfig

type RetryConfig struct {
	Base       time.Duration
	Factor     float64
	MaxRetries int
}

type RetryPolicy

type RetryPolicy interface {
	NextBackoff(attempt int) (time.Duration, bool)
}

RetryPolicy 定义重试策略。

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL