seqdelay

package module
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 15, 2026 License: MIT Imports: 14 Imported by: 0

README

seqdelay

High-performance delay queue powered by seqflow time wheel + Redis

MIT Redis

中文  |  English


What is seqdelay

A delay queue that schedules tasks with configurable precision (default 1ms), backed by Redis for persistence and distributed coordination. Use it as an embedded Go library or a standalone HTTP service.

Built on seqflow's time wheel — tasks are scheduled via O(1) slot insertion instead of Redis Sorted Set polling.

Use Cases

  • Order auto-cancel — unpaid after 15 minutes, auto close and release inventory
  • Order auto-review — no review after 5 days, auto submit positive review
  • Membership reminder — send SMS 15 days and 3 days before expiration
  • Payment callback retry — Alipay/WeChat async notification with progressive intervals (2m, 10m, 1h, 6h...)
  • Coupon expiration — notify user before coupon expires, then invalidate
  • Scheduled push — marketing messages at a specific future time
  • Rate limit cooldown — unlock user account after a temporary ban expires

Install

go get github.com/gocronx/seqdelay

Quick Start

Embedded mode (callback)

seqdelay runs inside your Go process. The callback is your business logic — no network call, no serialization overhead.

q, _ := seqdelay.New(seqdelay.WithRedis("localhost:6379"))

q.OnExpire("order-timeout", func(ctx context.Context, task *seqdelay.Task) error {
    fmt.Printf("order %s expired\n", task.ID)
    return nil // return nil → auto Finish; return error → redeliver after TTR
})

q.Start(ctx)

q.Add(ctx, &seqdelay.Task{
    ID:    "order-123",
    Topic: "order-timeout",
    Body:  []byte(`{"orderId":"123"}`),
    Delay: 15 * time.Minute,
    TTR:   30 * time.Second,
})

HTTP mode (pull)

seqdelay runs as a standalone service. Any language can interact via HTTP.

q, _ := seqdelay.New(seqdelay.WithRedis("localhost:6379"))
q.Start(ctx)

srv := seqdelay.NewServer(q, seqdelay.WithServerAddr(":9280"))
srv.ListenAndServe()
# Add task
curl -X POST localhost:9280/add \
  -d '{"topic":"notify","id":"msg-1","delay_ms":5000,"ttr_ms":30000}'

# Pop ready task (long-poll, waits up to 30s)
curl -X POST localhost:9280/pop -d '{"topic":"notify"}'

# Finish
curl -X POST localhost:9280/finish -d '{"topic":"notify","id":"msg-1"}'

Real-World Example: Auto-Cancel Unpaid Orders After 15 Minutes

Order created → if user doesn't pay within 15 minutes → auto cancel and release inventory.

Approach 1: Embedded (Go service)

Best when your service is written in Go. Direct function call, lowest latency.

// === In your order service ===

// On startup: register callback
q.OnExpire("order-auto-cancel", func(ctx context.Context, task *seqdelay.Task) error {
    var data struct{ OrderID string `json:"order_id"` }
    json.Unmarshal(task.Body, &data)

    if isOrderPaid(data.OrderID) {
        return nil // already paid, skip
    }
    return cancelOrderAndReleaseStock(data.OrderID)
})

// On order created: schedule auto-cancel
q.Add(ctx, &seqdelay.Task{
    ID:    "cancel-" + orderID,
    Topic: "order-auto-cancel",
    Body:  []byte(`{"order_id":"` + orderID + `"}`),
    Delay: 15 * time.Minute,    // 15 minutes
    TTR:   30 * time.Second,    // retry after 30s on failure
})

// User pays: cancel the auto-cancel task
q.Cancel(ctx, "order-auto-cancel", "cancel-" + orderID)
flowchart LR
    A[Order Created] -->|Add delay=15min| B{15 min passed?}
    B -->|User pays at min 8| C[Cancel - task removed]
    B -->|Not paid| D[OnExpire fires]
    D -->|return nil| E[Order cancelled]
    D -->|return err| F[Retry in 30s]
    F --> D

Approach 2: HTTP (any language)

Best when your service is not Go, or seqdelay runs as a shared service for multiple teams.

sequenceDiagram
    participant S as Your Service
    participant D as seqdelay :9280
    participant R as Redis

    S->>D: POST /add (delay=15min)
    D->>R: Save task + schedule

    alt User pays within 15 min
        S->>D: POST /cancel
        D->>R: Set CANCELLED
    else 15 min expires
        D->>R: DELAYED → READY
        S->>D: POST /pop
        D-->>S: task payload
        S->>S: cancel order + release stock
        S->>D: POST /finish
        D->>R: Set FINISHED
    end

Your service (any language):

# On order created
requests.post("http://seqdelay:9280/add", json={
    "topic": "order-auto-cancel",
    "id": f"cancel-{order_id}",
    "body": json.dumps({"order_id": order_id}),
    "delay_ms": 15 * 60 * 1000,  # 15 minutes in ms
    "ttr_ms": 30000
})

# User pays successfully
requests.post("http://seqdelay:9280/cancel", json={
    "topic": "order-auto-cancel",
    "id": f"cancel-{order_id}"
})

Worker (runs alongside your service):

while True:
    resp = requests.post("http://seqdelay:9280/pop", json={
        "topic": "order-auto-cancel",
        "timeout_ms": 30000  # long-poll 30s
    })
    if resp.json()["data"]:
        task = resp.json()["data"]
        cancel_order(task["body"])
        requests.post("http://seqdelay:9280/finish", json={
            "topic": "order-auto-cancel",
            "id": task["id"]
        })

API

Go SDK

Method Description
Add(ctx, *Task) Add a delayed task
Pop(ctx, topic) Pull a ready task (blocking)
Finish(ctx, topic, id) Mark task complete
Cancel(ctx, topic, id) Cancel a pending task
Get(ctx, topic, id) Query task state
OnExpire(topic, fn) Register callback (embedded mode)
Shutdown(ctx) Graceful shutdown

HTTP Endpoints

Endpoint Method Description
/add POST Add delayed task
/pop POST Pull ready task (long-poll)
/finish POST Ack completion
/cancel POST Cancel task
/get GET Query task
/stats GET Queue statistics

Task Lifecycle

flowchart TD
    S((Start)) -->|Add| DELAYED
    DELAYED -->|delay expires| READY
    READY -->|Pop / Callback| ACTIVE
    ACTIVE -->|Finish| FINISHED
    ACTIVE -->|TTR expires| READY
    DELAYED -->|Cancel| CANCELLED
    READY -->|Cancel| CANCELLED
    ACTIVE -->|Cancel| CANCELLED

    style DELAYED fill:#3b82f6,color:#fff
    style READY fill:#f59e0b,color:#fff
    style ACTIVE fill:#8b5cf6,color:#fff
    style FINISHED fill:#22c55e,color:#fff
    style CANCELLED fill:#64748b,color:#fff

Examples

Example Description
embedded Callback mode inside your Go process
httpserver Standalone HTTP service with curl usage
distributed Multi-instance with leader election
ttr-retry TTR timeout and automatic redelivery
batch-add Add 1000 tasks with varying delays
cancel Cancel a task before it fires
stats-monitor Monitor queue stats via HTTP /stats

Configuration

Option Default Description
WithRedis(addr) required Redis standalone
WithRedisSentinel(addrs, master) Redis Sentinel
WithRedisCluster(addrs) Redis Cluster
WithTickInterval(d) 1ms Time wheel precision
WithWheelCapacity(n) 4096 Wheel slot count (power of 2)
WithMaxTopics(n) 1024 Max topic count
WithPopTimeout(d) 30s Default HTTP pop timeout
WithLockTTL(d) 500ms Distributed lock TTL
WithInstanceID(id) auto Instance ID for distributed lock

Distributed Deployment

Multiple instances against the same Redis. Only one advances the time wheel (leader). All serve HTTP.

graph TB
    subgraph Instances
        A["Instance A - leader<br/>time wheel + HTTP"]
        B["Instance B<br/>standby + HTTP"]
        C["Instance C<br/>standby + HTTP"]
    end
    A & B & C <--> R[(Redis)]
    style A fill:#2563eb,color:#fff,stroke:#1d4ed8
    style B fill:#64748b,color:#fff,stroke:#475569
    style C fill:#64748b,color:#fff,stroke:#475569
    style R fill:#dc382d,color:#fff,stroke:#b91c1c

Leader crash → lock expires (500ms) → standby takes over automatically.

Performance

Apple M4 / arm64

Time wheel (pure scheduling, no Redis):
  1M Add:           225ms (4.4M tasks/sec)
  1M Add + Fire:    1.2s (all 1M fired, zero loss)
  4 writers × 250K: 187ms (5.3M tasks/sec)
  1M Cancel:        268ms (290 ns/op)

Design

  • seqflow time wheel — O(1) add/fire, replaces Redis Sorted Set polling
  • Redis Lua scripts — atomic state transitions, no race conditions
  • {topic} hash tag — all keys for a topic on same Redis Cluster slot
  • Distributed lock — SetNX + heartbeat renewal for multi-instance
  • Recovery — full task recovery from Redis on restart

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrDuplicateTask = errors.New("seqdelay: task already exists")
	ErrTaskNotFound  = errors.New("seqdelay: task not found")
	ErrInvalidState  = errors.New("seqdelay: invalid task state for this operation")
	ErrTopicConflict = errors.New("seqdelay: topic already has a callback registered")
	ErrTooManyTopics = errors.New("seqdelay: max topic count exceeded")
	ErrQueueFull     = errors.New("seqdelay: ready queue is full")
	ErrClosed        = errors.New("seqdelay: queue is closed")
	ErrRedisRequired = errors.New("seqdelay: redis connection is required")
	ErrInvalidTask   = errors.New("seqdelay: task ID and Topic are required")
	ErrInvalidDelay  = errors.New("seqdelay: delay must be positive")
)

Functions

This section is empty.

Types

type Option

type Option func(*config)

Option 配置 Queue

func WithInstanceID

func WithInstanceID(id string) Option

WithInstanceID 设置实例 ID(分布式锁用,默认自动生成)

func WithLockTTL

func WithLockTTL(d time.Duration) Option

WithLockTTL 设置分布式锁 TTL(默认 500ms,心跳续期)

func WithMaxTopics

func WithMaxTopics(n int) Option

WithMaxTopics 设置最大 topic 数(默认 1024)

func WithPopTimeout

func WithPopTimeout(d time.Duration) Option

WithPopTimeout 设置 HTTP Pop 默认超时(默认 30s)

func WithReadyQueueSize

func WithReadyQueueSize(n int) Option

WithReadyQueueSize 设置每个 topic 就绪队列缓冲大小(默认 1024)

func WithRedis

func WithRedis(addr string) Option

WithRedis 设置 Redis 单机连接

func WithRedisClient

func WithRedisClient(client redis.UniversalClient) Option

WithRedisClient 设置已有的 Redis 客户端

func WithRedisCluster

func WithRedisCluster(addrs []string) Option

WithRedisCluster 设置 Redis Cluster 连接

func WithRedisOptions

func WithRedisOptions(opts *redis.Options) Option

WithRedisOptions 设置带完整选项的 Redis 连接

func WithRedisSentinel

func WithRedisSentinel(addrs []string, master string) Option

WithRedisSentinel 设置 Redis Sentinel 连接

func WithTickInterval

func WithTickInterval(d time.Duration) Option

WithTickInterval 设置时间轮精度(默认 1ms)

func WithWheelCapacity

func WithWheelCapacity(n uint32) Option

WithWheelCapacity 设置时间轮容量(默认 4096,必须 2 的幂)

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

Queue is the public entry point for the seqdelay delay-queue library. It wires together the Redis store, timing wheel, ready queue, and distributed lock into a single coherent unit.

func New

func New(opts ...Option) (*Queue, error)

New creates a new Queue with the given options. Returns ErrRedisRequired when no Redis client is configured.

func (*Queue) Add

func (q *Queue) Add(ctx context.Context, task *Task) error

Add validates and persists a new delayed task, then schedules it in the timing wheel.

func (*Queue) Cancel

func (q *Queue) Cancel(ctx context.Context, topic, id string) error

Cancel transitions a task to StateCancelled and removes it from the timing wheel.

func (*Queue) Finish

func (q *Queue) Finish(ctx context.Context, topic, id string) error

Finish marks a task as completed and cancels its TTR timer.

func (*Queue) Get

func (q *Queue) Get(ctx context.Context, topic, id string) (*Task, error)

Get retrieves the current state of a task without altering it.

func (*Queue) OnExpire

func (q *Queue) OnExpire(topic string, fn func(context.Context, *Task) error) error

OnExpire registers fn as the callback invoked when a task on topic becomes ready. The drain goroutine will pop tasks and call fn automatically, finishing the task on success or leaving it for TTR re-delivery on failure.

func (*Queue) Pop

func (q *Queue) Pop(ctx context.Context, topic string) (*Task, error)

Pop blocks until a task becomes available on the given topic's ready list or the configured pop timeout elapses. On success the task is in StateActive and a TTR re-delivery timer is armed.

Returns nil, nil when the timeout elapses with no available task.

func (*Queue) Shutdown

func (q *Queue) Shutdown(ctx context.Context) error

Shutdown gracefully stops the queue. It marks the queue closed, drains the timing wheel (bounded by ctx), stops all drain goroutines, and releases the distributed lock.

func (*Queue) Start

func (q *Queue) Start(ctx context.Context) error

Start begins background processing: task recovery from Redis, the timing wheel, and the leader-election heartbeat loop.

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server exposes the Queue over HTTP using a plain net/http ServeMux. All request/response bodies are JSON.

func NewServer

func NewServer(q *Queue, opts ...ServerOption) *Server

NewServer creates a Server that wraps the given Queue.

func (*Server) ListenAndServe

func (s *Server) ListenAndServe() error

ListenAndServe starts the HTTP server. It blocks until the server stops.

func (*Server) Shutdown

func (s *Server) Shutdown(ctx context.Context) error

Shutdown gracefully shuts down the HTTP server.

type ServerOption

type ServerOption func(*Server)

ServerOption configures a Server.

func WithServerAddr

func WithServerAddr(addr string) ServerOption

WithServerAddr sets the TCP listen address (default ":8080").

type Task

type Task struct {
	ID         string        `msgpack:"id" json:"id"`
	Topic      string        `msgpack:"topic" json:"topic"`
	Body       []byte        `msgpack:"body" json:"body"`
	Delay      time.Duration `msgpack:"delay" json:"delay_ms"`
	TTR        time.Duration `msgpack:"ttr" json:"ttr_ms"`
	MaxRetries int           `msgpack:"max_retries" json:"max_retries"`
	State      TaskState     `msgpack:"state" json:"state"`
	Retries    int           `msgpack:"retries" json:"retries"`
	CreatedAt  time.Time     `msgpack:"created_at" json:"created_at"`
	ActiveAt   time.Time     `msgpack:"active_at" json:"active_at"`
}

Task 表示一个延迟任务

func (*Task) Validate

func (t *Task) Validate() error

Validate 校验必填字段

type TaskState

type TaskState int

TaskState 表示任务生命周期状态

const (
	StateDelayed   TaskState = iota // 在时间轮中等待
	StateReady                      // 在就绪队列中
	StateActive                     // 已被消费,TTR 倒计时中
	StateFinished                   // 已完成
	StateCancelled                  // 已取消
)

func (TaskState) String

func (s TaskState) String() string

Directories

Path Synopsis
example
batch-add command
Batch add example: add many tasks with different delays.
Batch add example: add many tasks with different delays.
cancel command
Cancel example: add a task, then cancel it before it fires.
Cancel example: add a task, then cancel it before it fires.
distributed command
Distributed deployment: multiple instances share the same Redis.
Distributed deployment: multiple instances share the same Redis.
embedded command
Embedded mode: callback-driven delay queue within your Go process
Embedded mode: callback-driven delay queue within your Go process
httpserver command
HTTP server mode: standalone delay queue service
HTTP server mode: standalone delay queue service
stats-monitor command
Stats monitor: add tasks and watch queue statistics via HTTP.
Stats monitor: add tasks and watch queue statistics via HTTP.
ttr-retry command
TTR retry example: task is redelivered if not Finished within TTR timeout.
TTR retry example: task is redelivered if not Finished within TTR timeout.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL