Documentation
¶
Index ¶
- Variables
- func ExtractQueueName(key string) string
- func SetProgress(ctx context.Context, p int)
- func SetResult(ctx context.Context, v any) error
- func SetResultBytes(ctx context.Context, b []byte)
- type Client
- func (c *Client) DeleteTask(ctx context.Context, queue string, id string, opts ...Option) error
- func (c *Client) Enqueue(ctx context.Context, queue, taskType string, payload any, opts ...Option) error
- func (c *Client) ListTasks(ctx context.Context, queue string, state State, filter TaskFilter) ([]*Task, error)
- func (c *Client) RetryDead(ctx context.Context, queue string, id string, opts ...Option) error
- type Encoder
- type FmtLogger
- type HandlerFunc
- type JSONEncoder
- type Logger
- type Middleware
- type Mux
- type Option
- type Server
- type ServerConfig
- type State
- type Task
- type TaskFilter
Constants ¶
This section is empty.
Variables ¶
var AllStates = []State{StatePending, StateActive, StateDelayed, StateSucceeded, StateDead}
AllStates lists every valid queue state in a stable order.
var ErrActiveState = errors.New("uniqw: operation not allowed on active state")
ErrActiveState is returned when an operation is not allowed on the active state.
var ErrDuplicateTask = errors.New("uniqw: duplicate task id")
ErrDuplicateTask is returned when Enqueue is called with an ID that already exists for the queue.
var ErrTaskNotFound = errors.New("uniqw: task not found")
ErrTaskNotFound is returned when a task with the specified ID is not found.
var ErrUnknownState = errors.New("uniqw: unknown state")
ErrUnknownState is returned when an invalid state is used.
Functions ¶
func ExtractQueueName ¶
ExtractQueueName parses a queue name from a raw Redis key (e.g. "uniqw:{default}:pending"). It returns an empty string if the format is invalid.
func SetProgress ¶
SetProgress allows a handler to report progress (0..100) for the current task. It is a no-op if the context is not provided by the UniQw runtime.
func SetResult ¶
SetResult encodes the provided value using the default JSON encoder and attaches it as the handler result. It is safe to call multiple times; last wins. It is a no-op if the context is not provided by the UniQw runtime.
func SetResultBytes ¶
SetResultBytes attaches raw bytes as the handler result without encoding. It is a no-op if the context is not provided by the UniQw runtime.
Types ¶
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client provides APIs to enqueue and manage tasks in Redis.
func NewClient ¶
func NewClient(rdb redis.UniversalClient) *Client
NewClient creates a new UniQw client.
func (*Client) DeleteTask ¶
DeleteTask removes a task from the specified queue by its ID. It searches across all states (Pending, Delayed, Succeeded, Dead) and removes the first match. It returns ErrTaskNotFound if the ID is not found in any of those states.
func (*Client) Enqueue ¶
func (c *Client) Enqueue(ctx context.Context, queue, taskType string, payload any, opts ...Option) error
Enqueue adds a new task to the specified queue. It returns ErrDuplicateTask if the task ID (explicit or generated) already exists in the queue.
type Encoder ¶
type Encoder interface {
// Encode serializes a value to bytes.
Encode(any) ([]byte, error)
// Decode deserializes bytes to a value.
Decode([]byte, any) error
}
Encoder defines the interface for task payload serialization.
type FmtLogger ¶
type FmtLogger struct{}
FmtLogger is a minimal logger that prints messages with level prefixes. Debug/Info go to stdout; Warn/Error go to stderr.
type HandlerFunc ¶
HandlerFunc is the function signature for processing a task.
type JSONEncoder ¶
type JSONEncoder struct{}
JSONEncoder is the default implementation of Encoder using JSON. It uses standard library for encoding and sonic for decoding.
type Logger ¶
type Logger interface {
Debugf(format string, args ...any)
Infof(format string, args ...any)
Warnf(format string, args ...any)
Errorf(format string, args ...any)
}
Logger defines logging methods used by the library. Implementations should be cheap. Default is FmtLogger which writes to stdout/stderr using fmt.
type Middleware ¶
type Middleware func(HandlerFunc) HandlerFunc
Middleware is a function that wraps a HandlerFunc to provide cross-cutting concerns.
type Mux ¶
type Mux struct {
// contains filtered or unexported fields
}
Mux routes tasks to their respective handlers based on task type.
func (*Mux) Use ¶
func (m *Mux) Use(mw Middleware)
Use adds middleware(s) to the mux. Middlewares are executed in the order they are added.
type Option ¶
type Option func(*options)
Option is a function that configures task behavior during Enqueue or RetryDead.
func Deadline ¶
Deadline sets an absolute deadline for the task. The task will not be processed if the current time is past the deadline.
func ExpireIn ¶
ExpireIn sets a relative deadline for the task. The task will not be processed if the current time is past the deadline.
func RetentionError ¶
RetentionError sets how long (in seconds) the task is kept in the Dead state. If d is 0, the task will be dropped immediately after final failure. If d is negative, the task will be kept forever (default).
func TaskID ¶
TaskID sets a custom ID for the task. If not provided, a random UUID will be generated.
func WithKeepUniqueLock ¶
func WithKeepUniqueLock() Option
WithKeepUniqueLock ensures that the uniqueness lock for the task ID is NOT released even after the task is deleted (unless it reached Succeeded state).
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
Server processes tasks from Redis queues using workers.
func NewServer ¶
func NewServer(rdb *redis.Client, cfg ServerConfig, mux *Mux) *Server
NewServer creates a new UniQw server.
type ServerConfig ¶
type ServerConfig struct {
// Queues defines the queues to process and their relative weights.
Queues map[string]int
// Concurrency is the number of worker goroutines.
Concurrency int
// VisibilityTTL is the duration for which a task is leased by a worker.
// If the worker fails or crashes, the task will be reclaimed after this TTL.
VisibilityTTL time.Duration
// Logger is the logger used for server events.
Logger Logger
}
ServerConfig defines the configuration for a UniQw server.
type State ¶
type State string
State represents a queue state used to store and inspect tasks. Use the exported constants (StatePending, StateActive, etc.) instead of raw strings to avoid typos.
const ( // StatePending contains tasks ready for execution (LIST). StatePending State = "pending" // StateActive contains tasks currently being processed by workers (ZSET). StateActive State = "active" // StateDelayed contains scheduled tasks or tasks in backoff retry (ZSET). StateDelayed State = "delayed" // StateSucceeded contains successfully completed tasks (ZSET). StateSucceeded State = "succeeded" // StateDead contains permanently failed tasks (LIST). StateDead State = "dead" )
func ParseState ¶
ParseState converts a string into a State, returning an error for unknown values.
type Task ¶
type Task struct {
// ID is the unique identifier for the task.
ID string `json:"id"`
// Type defines the task category, used by Mux to route to the correct handler.
Type string `json:"type"`
// Queue is the name of the queue this task belongs to.
Queue string `json:"queue"`
// Payload is the raw task data.
Payload []byte `json:"payload"`
// Retry is the current number of retry attempts made.
Retry int `json:"retry"`
// MaxRetry is the maximum number of retries allowed before moving to Dead state.
MaxRetry int `json:"max_retry"`
// Retention is the duration (in seconds) to keep the task after successful completion.
Retention int64 `json:"retention"`
// ErrRetention is the duration (in seconds) to keep the task after it has permanently failed.
ErrRetention int64 `json:"err_retention,omitempty"`
// CreatedAt is the timestamp (ms) when the task was enqueued.
CreatedAt int64 `json:"created_at,omitempty"`
// DeadlineMs is the absolute timestamp (ms) after which the task should not be processed.
DeadlineMs int64 `json:"deadline_ms,omitempty"`
// StartedAt is the timestamp (ms) when the worker started processing the task.
StartedAt int64 `json:"started_at,omitempty"`
// CompletedAt is the timestamp (ms) when the task was finished (success or final failure).
CompletedAt int64 `json:"completed_at,omitempty"`
// LastError is the error message from the last failed attempt.
LastError string `json:"last_error,omitempty"`
// LastErrorAt is the timestamp (ms) of the last failed attempt.
LastErrorAt int64 `json:"last_error_at,omitempty"`
// Progress is the current task progress (0..100).
Progress int `json:"progress,omitempty"`
// Result is the execution result stored as JSON.
Result []byte `json:"result,omitempty"`
}
Task represents a unit of work to be processed by a worker. It is serialized to JSON and stored in Redis.
type TaskFilter ¶
TaskFilter is a function used to filter tasks during ListTasks.