Documentation
¶
Index ¶
- Variables
- type Option
- func WithInstanceID(id string) Option
- func WithLockTTL(d time.Duration) Option
- func WithMaxTopics(n int) Option
- func WithPopTimeout(d time.Duration) Option
- func WithReadyQueueSize(n int) Option
- func WithRedis(addr string) Option
- func WithRedisClient(client redis.UniversalClient) Option
- func WithRedisCluster(addrs []string) Option
- func WithRedisOptions(opts *redis.Options) Option
- func WithRedisSentinel(addrs []string, master string) Option
- func WithTickInterval(d time.Duration) Option
- func WithWheelCapacity(n uint32) Option
- type Queue
- func (q *Queue) Add(ctx context.Context, task *Task) error
- func (q *Queue) Cancel(ctx context.Context, topic, id string) error
- func (q *Queue) Finish(ctx context.Context, topic, id string) error
- func (q *Queue) Get(ctx context.Context, topic, id string) (*Task, error)
- func (q *Queue) OnExpire(topic string, fn func(context.Context, *Task) error) error
- func (q *Queue) Pop(ctx context.Context, topic string) (*Task, error)
- func (q *Queue) Shutdown(ctx context.Context) error
- func (q *Queue) Start(ctx context.Context) error
- type Server
- type ServerOption
- type Task
- type TaskState
Constants ¶
This section is empty.
Variables ¶
var ( ErrDuplicateTask = errors.New("seqdelay: task already exists") ErrTaskNotFound = errors.New("seqdelay: task not found") ErrInvalidState = errors.New("seqdelay: invalid task state for this operation") ErrTopicConflict = errors.New("seqdelay: topic already has a callback registered") ErrTooManyTopics = errors.New("seqdelay: max topic count exceeded") ErrQueueFull = errors.New("seqdelay: ready queue is full") ErrClosed = errors.New("seqdelay: queue is closed") ErrRedisRequired = errors.New("seqdelay: redis connection is required") ErrInvalidTask = errors.New("seqdelay: task ID and Topic are required") ErrInvalidDelay = errors.New("seqdelay: delay must be positive") )
Functions ¶
This section is empty.
Types ¶
type Option ¶
type Option func(*config)
Option 配置 Queue
func WithPopTimeout ¶
WithPopTimeout 设置 HTTP Pop 默认超时(默认 30s)
func WithReadyQueueSize ¶
WithReadyQueueSize 设置每个 topic 就绪队列缓冲大小(默认 1024)
func WithRedisClient ¶
func WithRedisClient(client redis.UniversalClient) Option
WithRedisClient 设置已有的 Redis 客户端
func WithRedisCluster ¶
WithRedisCluster 设置 Redis Cluster 连接
func WithRedisOptions ¶
WithRedisOptions 设置带完整选项的 Redis 连接
func WithRedisSentinel ¶
WithRedisSentinel 设置 Redis Sentinel 连接
func WithTickInterval ¶
WithTickInterval 设置时间轮精度(默认 1ms)
func WithWheelCapacity ¶
WithWheelCapacity 设置时间轮容量(默认 4096,必须 2 的幂)
type Queue ¶
type Queue struct {
// contains filtered or unexported fields
}
Queue is the public entry point for the seqdelay delay-queue library. It wires together the Redis store, timing wheel, ready queue, and distributed lock into a single coherent unit.
func New ¶
New creates a new Queue with the given options. Returns ErrRedisRequired when no Redis client is configured.
func (*Queue) Add ¶
Add validates and persists a new delayed task, then schedules it in the timing wheel.
func (*Queue) Cancel ¶
Cancel transitions a task to StateCancelled and removes it from the timing wheel.
func (*Queue) OnExpire ¶
OnExpire registers fn as the callback invoked when a task on topic becomes ready. The drain goroutine will pop tasks and call fn automatically, finishing the task on success or leaving it for TTR re-delivery on failure.
func (*Queue) Pop ¶
Pop blocks until a task becomes available on the given topic's ready list or the configured pop timeout elapses. On success the task is in StateActive and a TTR re-delivery timer is armed.
Returns nil, nil when the timeout elapses with no available task.
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
Server exposes the Queue over HTTP using a plain net/http ServeMux. All request/response bodies are JSON.
func NewServer ¶
func NewServer(q *Queue, opts ...ServerOption) *Server
NewServer creates a Server that wraps the given Queue.
func (*Server) ListenAndServe ¶
ListenAndServe starts the HTTP server. It blocks until the server stops.
type ServerOption ¶
type ServerOption func(*Server)
ServerOption configures a Server.
func WithServerAddr ¶
func WithServerAddr(addr string) ServerOption
WithServerAddr sets the TCP listen address (default ":8080").
type Task ¶
type Task struct {
ID string `msgpack:"id" json:"id"`
Topic string `msgpack:"topic" json:"topic"`
Body []byte `msgpack:"body" json:"body"`
Delay time.Duration `msgpack:"delay" json:"delay_ms"`
TTR time.Duration `msgpack:"ttr" json:"ttr_ms"`
MaxRetries int `msgpack:"max_retries" json:"max_retries"`
State TaskState `msgpack:"state" json:"state"`
Retries int `msgpack:"retries" json:"retries"`
CreatedAt time.Time `msgpack:"created_at" json:"created_at"`
ActiveAt time.Time `msgpack:"active_at" json:"active_at"`
}
Task 表示一个延迟任务
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
example
|
|
|
batch-add
command
Batch add example: add many tasks with different delays.
|
Batch add example: add many tasks with different delays. |
|
cancel
command
Cancel example: add a task, then cancel it before it fires.
|
Cancel example: add a task, then cancel it before it fires. |
|
distributed
command
Distributed deployment: multiple instances share the same Redis.
|
Distributed deployment: multiple instances share the same Redis. |
|
embedded
command
Embedded mode: callback-driven delay queue within your Go process
|
Embedded mode: callback-driven delay queue within your Go process |
|
httpserver
command
HTTP server mode: standalone delay queue service
|
HTTP server mode: standalone delay queue service |
|
stats-monitor
command
Stats monitor: add tasks and watch queue statistics via HTTP.
|
Stats monitor: add tasks and watch queue statistics via HTTP. |
|
ttr-retry
command
TTR retry example: task is redelivered if not Finished within TTR timeout.
|
TTR retry example: task is redelivered if not Finished within TTR timeout. |