queue

package
v0.1.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 8, 2026 License: MIT Imports: 19 Imported by: 0

Documentation

Index

Constants

View Source
const (
	MessageMetadataPattern        = "queue.pattern"
	MessageMetadataQueue          = "queue.queue"
	MessageMetadataMaxRetry       = "queue.max_retry"
	MessageMetadataTimeout        = "queue.timeout"
	MessageMetadataDelay          = "queue.delay"
	MessageMetadataPriority       = "queue.priority"
	MessageMetadataTrace          = "queue.trace"
	MessageMetadataLockKey        = "queue.lock.key"
	MessageMetadataLockTTL        = "queue.lock.ttl"
	MessageMetadataWorker         = "queue.worker"
	MessageMetadataConcurrencyKey = "queue.concurrency.key"
)
View Source
const (
	TaskLogLevelInfo  = "info"
	TaskLogLevelWarn  = "warn"
	TaskLogLevelError = "error"
)
View Source
const DefaultQueueName = "default"
View Source
const RuntimeCapabilityName = "queue.runtime"

Variables

View Source
var (
	ErrNotInitialized        = errors.New("queue: not initialized")
	ErrCapabilityUnsupported = errors.New("queue capability unsupported")
	ErrTaskDuplicated        = errors.New("queue task duplicated")
	ErrTaskNotFound          = errors.New("queue task not found")
	ErrQueueNotFound         = errors.New("queue not found")
	ErrQueuePaused           = errors.New("queue paused")
	ErrRateLimited           = errors.New("queue rate limited")
	ErrTaskCanceled          = errors.New("queue task canceled")
	ErrInvalidPayload        = errors.New("queue invalid payload")
	ErrHandlerNotFound       = errors.New("queue handler not found")
	ErrLockNotAcquired       = errors.New("queue lock not acquired")
	ErrIdempotentDone        = errors.New("queue idempotent done")
)

Functions

func CanTransitionTaskState

func CanTransitionTaskState(from TaskState, to TaskState) bool

func CloneCapabilities

func CloneCapabilities(in map[Capability]bool) map[Capability]bool

func ContextFromCorrelationHeaders

func ContextFromCorrelationHeaders(ctx context.Context, headers map[string]string) context.Context

func ContextWithMessage

func ContextWithMessage(ctx context.Context, msg *Message) context.Context

func ContextWithMetadata

func ContextWithMetadata(ctx context.Context, metadata ContextMetadata) context.Context

func ContextWithRuntime

func ContextWithRuntime(ctx context.Context, runtime *RuntimeInstance) context.Context

func ContextWithRuntimeContext

func ContextWithRuntimeContext(ctx context.Context, runtime *RuntimeContext) context.Context

func CorrelationHeaderValue

func CorrelationHeaderValue(headers map[string]string, key string) string

func CorrelationHeadersFromContext

func CorrelationHeadersFromContext(ctx context.Context) map[string]string

func DecodePayload

func DecodePayload[T any](msg *Message) (T, error)

func DefaultRetryDelay

func DefaultRetryDelay(retryCount int, err error, _ *Message) time.Duration

func DispatchAutoRetryTasks added in v0.1.1

func DispatchAutoRetryTasks(ctx context.Context, limit int) (int, error)

func EnsureMessageRuntimeContext

func EnsureMessageRuntimeContext(ctx context.Context, msg *Message) context.Context

func IsDeadLetterError

func IsDeadLetterError(err error) bool

func IsFatalError

func IsFatalError(err error) bool

func IsIgnoredError

func IsIgnoredError(err error) bool

func IsLockNotAcquired

func IsLockNotAcquired(err error) bool

func IsRateLimitError

func IsRateLimitError(err error) bool

func IsRetryableError

func IsRetryableError(err error) bool

func IsTimeoutError

func IsTimeoutError(err error) bool

func MarshalPayload

func MarshalPayload(payload any) ([]byte, error)

func MessageMetadataDuration

func MessageMetadataDuration(msg *Message, key string) (time.Duration, bool)

func MessageMetadataString

func MessageMetadataString(msg *Message, key string) (string, bool)

func MessageMetadataValue

func MessageMetadataValue(msg *Message, key string) (any, bool)

func NewDeadLetterError

func NewDeadLetterError(err error) error

func NewFatalError

func NewFatalError(err error) error

func NewIgnoredError

func NewIgnoredError(err error) error

func NewRetryableError

func NewRetryableError(err error) error

func NewRuntimeError

func NewRuntimeError(code string, err error) error

func NewTimeoutError

func NewTimeoutError(err error) error

func RateLimited

func RateLimited(retryIn time.Duration, err error) error

func RecordTaskDispatchFailed

func RecordTaskDispatchFailed(ctx context.Context, store TaskStore, record TaskRecord) error

func RecordTaskDispatched

func RecordTaskDispatched(ctx context.Context, store TaskStore, record TaskRecord) error

func RecordTaskEnqueued

func RecordTaskEnqueued(ctx context.Context, store TaskStore, record TaskRecord) error

func RegisterDriver

func RegisterDriver(driver RunnerDriver) error

func RequestIDFromHeaders

func RequestIDFromHeaders(headers map[string]string) string

func RetryableAfter

func RetryableAfter(retryIn time.Duration, err error) error

func SetMessageMetadata

func SetMessageMetadata(msg *Message, key string, value any)

func SetSpanCorrelationAttributes

func SetSpanCorrelationAttributes(ctx context.Context, span tracing.Span)

func SetTaskState

func SetTaskState(msg *Message, state TaskState)

SetTaskState 按状态机规则推进任务状态。非法流转会被忽略。

新代码优先使用 TransitionTaskState 获取是否流转成功。

func SpanIDFromHeaders

func SpanIDFromHeaders(headers map[string]string) string

func TraceIDFromHeaders

func TraceIDFromHeaders(headers map[string]string) string

func TrackIDFromHeaders

func TrackIDFromHeaders(headers map[string]string) string

func TransitionTaskState

func TransitionTaskState(msg *Message, next TaskState) bool

func UpdateStoredTaskStatus

func UpdateStoredTaskStatus(ctx context.Context, store TaskStore, update TaskStatusUpdate) error

func WithTaskLogger

func WithTaskLogger(ctx context.Context, logger TaskLogger) context.Context

Types

type ActionLogger

type ActionLogger interface {
	LogQueueAction(ctx context.Context, action QueueAction) error
}

type BackoffStrategy

type BackoffStrategy interface {
	NextDelay(retry int, err error) time.Duration
}

type Capability

type Capability string
const (
	CapEnqueue     Capability = "enqueue"
	CapConsume     Capability = "consume"
	CapRetry       Capability = "retry"
	CapTimeout     Capability = "timeout"
	CapDeadline    Capability = "deadline"
	CapDelay       Capability = "delay"
	CapUnique      Capability = "unique"
	CapPriority    Capability = "priority"
	CapRateLimit   Capability = "rate_limit"
	CapCancel      Capability = "cancel"
	CapPauseResume Capability = "pause_resume"
	CapDLQ         Capability = "dlq"
	CapInspector   Capability = "inspector"
	CapBatch       Capability = "batch"
	CapChain       Capability = "chain"
	CapProgress    Capability = "progress"
	CapHeartbeat   Capability = "heartbeat"
	CapLock        Capability = "lock"
	CapIdempotency Capability = "idempotency"
	CapMetrics     Capability = "metrics"
	CapLog         Capability = "log"
	CapTrace       Capability = "trace"
	CapActionLog   Capability = "action_log"
	CapOutbox      Capability = "outbox"
)

type Client

type Client interface {
	Enqueue(ctx context.Context, task Task, opts ...Option) (*TaskInfo, error)
	BatchEnqueue(ctx context.Context, tasks []Task, opts ...Option) ([]*TaskInfo, error)
	Close() error
}

func NewClient

func NewClient(cfg Config) (Client, error)

type ConcurrencyLimiter

type ConcurrencyLimiter interface {
	Acquire(ctx context.Context, key string) error
	Release(ctx context.Context, key string)
}

type Config

type Config struct {
	Driver string      `mapstructure:"driver" yaml:"driver"`
	Redis  RedisConfig `mapstructure:"redis" yaml:"redis"`
	NATS   NATSConfig  `mapstructure:"nats" yaml:"nats"`

	Addr     string `mapstructure:"addr" yaml:"addr"`
	Password string `mapstructure:"password" yaml:"password"`
	DB       int    `mapstructure:"db" yaml:"db"`

	Concurrency    int            `mapstructure:"concurrency" yaml:"concurrency"`
	Queues         map[string]int `mapstructure:"queues" yaml:"queues"`
	StrictPriority bool           `mapstructure:"strict_priority" yaml:"strict_priority"`

	Workers     map[string]WorkerProfile `mapstructure:"workers" yaml:"workers"`
	RateLimit   RateLimitConfig          `mapstructure:"rate_limit" yaml:"rate_limit"`
	Lock        LockConfig               `mapstructure:"lock" yaml:"lock"`
	Idempotency IdempotencyConfig        `mapstructure:"idempotency" yaml:"idempotency"`
	Outbox      OutboxConfig             `mapstructure:"outbox" yaml:"outbox"`
	Schedule    ScheduleConfig           `mapstructure:"schedule" yaml:"schedule"`
}

func FromConfig

func FromConfig(cfg *Config) Config

func (Config) Normalize

func (c Config) Normalize() Config

func (Config) WorkerProfile

func (c Config) WorkerProfile(name string) WorkerProfile

type ContextHandler

type ContextHandler func(*HandlerContext) error

type ContextMetadata

type ContextMetadata struct {
	TaskID     string
	TaskType   string
	Queue      string
	TaskState  TaskState
	RetryCount int
	MaxRetry   int
	WorkerID   string
	TrackID    string
	RequestID  string
	TraceID    string
	SpanID     string
	TenantID   string
	UserID     string
	Event      map[string]string
}

func ContextMetadataFromMessage

func ContextMetadataFromMessage(msg *Message) ContextMetadata

func MetadataFromContext

func MetadataFromContext(ctx context.Context) (ContextMetadata, bool)

type DeadLetter

type DeadLetter interface {
	Push(ctx context.Context, msg *Message, err error) error
}

type Dispatcher

type Dispatcher struct {
	// contains filtered or unexported fields
}

func NewDispatcher

func NewDispatcher() *Dispatcher

func (*Dispatcher) AddHook

func (d *Dispatcher) AddHook(h Hook)

func (*Dispatcher) Dispatch

func (d *Dispatcher) Dispatch(ctx context.Context, pattern string, msg *Message) error

func (*Dispatcher) Entries

func (d *Dispatcher) Entries() []HandlerMetadata

func (*Dispatcher) Handler

func (d *Dispatcher) Handler(pattern string) HandlerFunc

func (*Dispatcher) HandlerFor

func (d *Dispatcher) HandlerFor(pattern string) (HandlerFunc, bool)

func (*Dispatcher) Metadata

func (d *Dispatcher) Metadata() RegistryMetadata

func (*Dispatcher) Orchestrator

func (d *Dispatcher) Orchestrator() *Orchestrator

func (*Dispatcher) Register

func (d *Dispatcher) Register(pattern string, handlers ...any) error

func (*Dispatcher) SetOrchestrator

func (d *Dispatcher) SetOrchestrator(orchestrator *Orchestrator)

func (*Dispatcher) Use

func (d *Dispatcher) Use(middlewares ...Middleware)

func (*Dispatcher) UseRuntime

func (d *Dispatcher) UseRuntime(middlewares ...RuntimeMiddleware)

type Driver

type Driver interface {
	Name() string
	Capabilities() map[Capability]bool
	Supports(cap Capability) bool

	NewClient(cfg Config) (Client, error)
	NewWorker(cfg Config, profile WorkerProfile) (Worker, error)
	NewManager(cfg Config) (Manager, error)
}

type EnqueueOptions

type EnqueueOptions struct {
	Queue        string
	TaskID       string
	MaxRetry     *int
	Timeout      time.Duration
	Deadline     time.Time
	ProcessAt    time.Time
	ProcessIn    time.Duration
	UniqueTTL    time.Duration
	Retention    time.Duration
	Group        string
	Priority     int
	RateLimitKey string
	Trace        bool

	AutoRetryEnabled bool
	AutoRetryMax     int
	AutoRetryDelay   time.Duration
}

func ApplyOptions

func ApplyOptions(opts []Option) EnqueueOptions

type ErrorKind

type ErrorKind string
const (
	ErrorRetryable  ErrorKind = "retryable"
	ErrorFatal      ErrorKind = "fatal"
	ErrorTimeout    ErrorKind = "timeout"
	ErrorDeadLetter ErrorKind = "deadletter"
	ErrorIgnored    ErrorKind = "ignored"
)

type EventPublisher

type EventPublisher interface {
	Publish(ctx context.Context, event RuntimeEvent)
}

type HandlerContext

type HandlerContext struct {
	Message *Message
	// contains filtered or unexported fields
}

func (*HandlerContext) Context

func (c *HandlerContext) Context() context.Context

func (*HandlerContext) Next

func (c *HandlerContext) Next() error

func (*HandlerContext) SetContext

func (c *HandlerContext) SetContext(ctx context.Context)

type HandlerFunc

type HandlerFunc func(context.Context, *Message) error

func BuildHandlerChain

func BuildHandlerChain(handlers ...any) (HandlerFunc, error)

func Chain

func Chain(handler HandlerFunc, middlewares ...Middleware) HandlerFunc

func ChainRuntime

func ChainRuntime(handler HandlerFunc, middlewares ...RuntimeMiddleware) HandlerFunc

type HandlerMetadata

type HandlerMetadata struct {
	Pattern         string
	Handler         string
	Payload         string
	MiddlewareCount int
	Queue           string
	MaxRetry        *int
	Timeout         time.Duration
	Delay           time.Duration
	Priority        int
	Trace           bool
}

type Hook

type Hook interface {
	BeforeProcess(ctx context.Context, msg *Message) error
	AfterProcess(ctx context.Context, msg *Message, err error)
	OnSuccess(ctx context.Context, msg *Message)
	OnFailure(ctx context.Context, msg *Message, err error)
}

type HookFunc

type HookFunc struct {
	Before  func(context.Context, *Message) error
	After   func(context.Context, *Message, error)
	Success func(context.Context, *Message)
	Failure func(context.Context, *Message, error)
}

func (HookFunc) AfterProcess

func (h HookFunc) AfterProcess(ctx context.Context, msg *Message, err error)

func (HookFunc) BeforeProcess

func (h HookFunc) BeforeProcess(ctx context.Context, msg *Message) error

func (HookFunc) OnFailure

func (h HookFunc) OnFailure(ctx context.Context, msg *Message, err error)

func (HookFunc) OnSuccess

func (h HookFunc) OnSuccess(ctx context.Context, msg *Message)

type Idempotency

type Idempotency interface {
	Done(ctx context.Context, key string) (bool, error)
	MarkDone(ctx context.Context, key string, ttl time.Duration) error
}

type IdempotencyConfig

type IdempotencyConfig struct {
	Enabled bool   `mapstructure:"enabled" yaml:"enabled"`
	Prefix  string `mapstructure:"prefix" yaml:"prefix"`
}

type IsFailureFunc

type IsFailureFunc func(error) bool

type LockConfig

type LockConfig struct {
	Enabled bool   `mapstructure:"enabled" yaml:"enabled"`
	Prefix  string `mapstructure:"prefix" yaml:"prefix"`
}

type Locker

type Locker interface {
	Lock(ctx context.Context, key string, ttl time.Duration) (unlock func(context.Context) error, ok bool, err error)
}

type Manager

type Manager interface {
	Supports(cap Capability) bool
	Capabilities() map[Capability]bool

	ListQueues(ctx context.Context) ([]*QueueInfo, error)
	GetQueue(ctx context.Context, queue string) (*QueueInfo, error)

	ListTasks(ctx context.Context, query TaskQuery) ([]*TaskInfo, error)
	GetTask(ctx context.Context, queue, taskID string) (*TaskInfo, error)

	DeleteTask(ctx context.Context, queue, taskID string) error
	RetryTask(ctx context.Context, queue, taskID string) error
	ArchiveTask(ctx context.Context, queue, taskID string) error
	CancelTask(ctx context.Context, queue, taskID string) error

	PauseQueue(ctx context.Context, queue string) error
	ResumeQueue(ctx context.Context, queue string) error
}

func NewManager

func NewManager(cfg Config) (Manager, error)

type Message

type Message struct {
	ID         string
	Type       string
	Payload    []byte
	Queue      string
	State      TaskState
	RetryCount int
	MaxRetry   int
	Headers    map[string]string
	Metadata   map[string]any
	Runtime    *RuntimeContext
}

func MessageFromContext

func MessageFromContext(ctx context.Context) (*Message, bool)

type MetricsLabels

type MetricsLabels struct {
	Queue    string
	TaskType string
	Status   string
	Worker   string
}

type MetricsRecorder

type MetricsRecorder interface {
	IncCounter(ctx context.Context, name string, labels MetricsLabels, value int64)
	ObserveDuration(ctx context.Context, name string, labels MetricsLabels, value time.Duration)
}

type Middleware

type Middleware func(HandlerFunc) HandlerFunc

func ContextChain

func ContextChain(handlers ...ContextHandler) Middleware

func TaskStoreMiddleware

func TaskStoreMiddleware(store TaskStore, opts TaskStoreOptions) Middleware

type MiddlewareMetadata

type MiddlewareMetadata struct {
	Count int
	Names []string
}

type MiddlewareStage

type MiddlewareStage int
const (
	// MiddlewareStage 已冻结为 runtime governance 的固定顺序,maintenance mode 下不新增 stage。
	//
	// 新治理能力应优先归入已有 stage,只有改变执行语义时才考虑调整 runtime kernel。
	// RecoverStage 用于捕获后续 middleware 或 handler 的 panic,并转换为 error。
	RecoverStage MiddlewareStage = iota

	// TraceStage 用于在观测和业务逻辑执行前创建或恢复 tracing 上下文。
	TraceStage

	// MetricsStage 用于统计任务次数、成功失败和执行耗时。
	MetricsStage

	// LoggingStage 用于记录任务开始、完成和失败日志。
	LoggingStage

	// TimeoutStage 用于任务级超时控制,通常由 queue.WithTimeout 在注册期自动生成。
	TimeoutStage

	// RateLimitStage 用于在占用 worker 并发和业务锁前进行限流。
	RateLimitStage

	// ConcurrencyStage 用于按任务类型、业务 key、租户等维度限制并发执行。
	ConcurrencyStage

	// LockStage 用于在进入业务逻辑前获取业务锁。
	LockStage

	// RetryStage 用于判断错误是否需要重试,并附加重试延迟。
	RetryStage

	// DeadLetterStage 用于在重试策略完成分类后记录终态失败任务。
	DeadLetterStage

	// BusinessStage 是普通 middleware 的默认阶段,也是业务 handler 的执行边界。
	BusinessStage
)

type NATSConfig

type NATSConfig struct {
	Stream        string        `mapstructure:"stream" yaml:"stream"`
	SubjectPrefix string        `mapstructure:"subject_prefix" yaml:"subject_prefix"`
	DurablePrefix string        `mapstructure:"durable_prefix" yaml:"durable_prefix"`
	AckWait       time.Duration `mapstructure:"ack_wait" yaml:"ack_wait"`
	MaxDeliver    int           `mapstructure:"max_deliver" yaml:"max_deliver"`
	MaxAge        time.Duration `mapstructure:"max_age" yaml:"max_age"`
	Duplicates    time.Duration `mapstructure:"duplicates" yaml:"duplicates"`
	Storage       string        `mapstructure:"storage" yaml:"storage"`
	Replicas      int           `mapstructure:"replicas" yaml:"replicas"`
	FetchBatch    int           `mapstructure:"fetch_batch" yaml:"fetch_batch"`
	FetchWait     time.Duration `mapstructure:"fetch_wait" yaml:"fetch_wait"`
	RetryDelay    time.Duration `mapstructure:"retry_delay" yaml:"retry_delay"`
}

type NoopTaskLogger

type NoopTaskLogger struct{}

func (NoopTaskLogger) Error

func (NoopTaskLogger) Error(string, ...any)

func (NoopTaskLogger) Info

func (NoopTaskLogger) Info(string, ...any)

func (NoopTaskLogger) Warn

func (NoopTaskLogger) Warn(string, ...any)

type Observer

type Observer interface {
	OnTaskStart(ctx context.Context, msg *Message)
	OnTaskFinish(ctx context.Context, msg *Message, err error)
	OnTaskRetry(ctx context.Context, msg *Message, err error)
	OnTaskFailure(ctx context.Context, msg *Message, err error)
}

type ObserverFunc

type ObserverFunc struct {
	Start   func(context.Context, *Message)
	Finish  func(context.Context, *Message, error)
	Retry   func(context.Context, *Message, error)
	Failure func(context.Context, *Message, error)
}

func (ObserverFunc) OnTaskFailure

func (o ObserverFunc) OnTaskFailure(ctx context.Context, msg *Message, err error)

func (ObserverFunc) OnTaskFinish

func (o ObserverFunc) OnTaskFinish(ctx context.Context, msg *Message, err error)

func (ObserverFunc) OnTaskRetry

func (o ObserverFunc) OnTaskRetry(ctx context.Context, msg *Message, err error)

func (ObserverFunc) OnTaskStart

func (o ObserverFunc) OnTaskStart(ctx context.Context, msg *Message)

type OperationsRuntime

type OperationsRuntime struct {
	// contains filtered or unexported fields
}

func NewOperationsRuntime

func NewOperationsRuntime(manager Manager) *OperationsRuntime

func (*OperationsRuntime) ArchiveTask

func (o *OperationsRuntime) ArchiveTask(ctx context.Context, queueName string, taskID string) error

func (*OperationsRuntime) CancelTask

func (o *OperationsRuntime) CancelTask(ctx context.Context, queueName string, taskID string) error

func (*OperationsRuntime) Capabilities

func (o *OperationsRuntime) Capabilities() map[Capability]bool

func (*OperationsRuntime) CleanTasks

func (o *OperationsRuntime) CleanTasks(ctx context.Context, query TaskQuery) (int, error)

func (*OperationsRuntime) DeleteTask

func (o *OperationsRuntime) DeleteTask(ctx context.Context, queueName string, taskID string) error

func (*OperationsRuntime) Drain

func (o *OperationsRuntime) Drain(ctx context.Context) error

func (*OperationsRuntime) DrainQueue

func (o *OperationsRuntime) DrainQueue(ctx context.Context, queueName string) error

func (*OperationsRuntime) GetQueue

func (o *OperationsRuntime) GetQueue(ctx context.Context, queueName string) (*QueueInfo, error)

func (*OperationsRuntime) GetTask

func (o *OperationsRuntime) GetTask(ctx context.Context, queueName string, taskID string) (*TaskInfo, error)

func (*OperationsRuntime) ListFailedTasks

func (o *OperationsRuntime) ListFailedTasks(ctx context.Context, query TaskQuery) ([]*TaskInfo, error)

func (*OperationsRuntime) ListQueues

func (o *OperationsRuntime) ListQueues(ctx context.Context) ([]*QueueInfo, error)

func (*OperationsRuntime) ListTasks

func (o *OperationsRuntime) ListTasks(ctx context.Context, query TaskQuery) ([]*TaskInfo, error)

func (*OperationsRuntime) Manager

func (o *OperationsRuntime) Manager() Manager

func (*OperationsRuntime) Metadata

func (o *OperationsRuntime) Metadata() RuntimeMetadata

func (*OperationsRuntime) Metrics

func (*OperationsRuntime) PauseQueue

func (o *OperationsRuntime) PauseQueue(ctx context.Context, queueName string) error

func (*OperationsRuntime) QueueStatus

func (o *OperationsRuntime) QueueStatus(ctx context.Context) ([]QueueRuntimeStatus, error)

func (*OperationsRuntime) ResumeQueue

func (o *OperationsRuntime) ResumeQueue(ctx context.Context, queueName string) error

func (*OperationsRuntime) RetryTask

func (o *OperationsRuntime) RetryTask(ctx context.Context, queueName string, taskID string) error

func (*OperationsRuntime) RuntimeStatus

func (o *OperationsRuntime) RuntimeStatus(ctx context.Context) (*RuntimeStatusInfo, error)

func (*OperationsRuntime) SetManager

func (o *OperationsRuntime) SetManager(manager Manager)

func (*OperationsRuntime) SetMetadata

func (o *OperationsRuntime) SetMetadata(metadata RuntimeMetadata)

func (*OperationsRuntime) SetTaskStore added in v0.1.1

func (o *OperationsRuntime) SetTaskStore(store TaskStore)

func (*OperationsRuntime) Status

func (o *OperationsRuntime) Status(ctx context.Context) ([]*QueueInfo, error)

func (*OperationsRuntime) Supports

func (o *OperationsRuntime) Supports(cap Capability) bool

func (*OperationsRuntime) TaskStore added in v0.1.1

func (o *OperationsRuntime) TaskStore() TaskStore

func (*OperationsRuntime) WorkerStatus

type Option

type Option func(*EnqueueOptions)

func AutoRetry

func AutoRetry(max int, delay time.Duration) Option

func Deadline

func Deadline(t time.Time) Option

func Group

func Group(name string) Option

func MaxRetry

func MaxRetry(n int) Option

func ProcessAt

func ProcessAt(t time.Time) Option

func ProcessIn

func ProcessIn(d time.Duration) Option

func Queue

func Queue(name string) Option

func Retention

func Retention(d time.Duration) Option

func TaskID

func TaskID(id string) Option

func Timeout

func Timeout(d time.Duration) Option

func Unique

func Unique(ttl time.Duration) Option

func WithAutoRetry

func WithAutoRetry(max int, delay time.Duration) Option

func WithDeadline

func WithDeadline(t time.Time) Option

func WithDelay

func WithDelay(d time.Duration) Option

func WithGroup

func WithGroup(name string) Option

func WithMaxRetry

func WithMaxRetry(n int) Option

func WithPriority

func WithPriority(priority int) Option

func WithProcessAt

func WithProcessAt(t time.Time) Option

func WithProcessIn

func WithProcessIn(d time.Duration) Option

func WithQueue

func WithQueue(name string) Option

func WithRateLimitKey

func WithRateLimitKey(key string) Option

func WithRetention

func WithRetention(d time.Duration) Option

func WithRetry

func WithRetry(n int) Option

func WithTaskID

func WithTaskID(id string) Option

func WithTimeout

func WithTimeout(d time.Duration) Option

func WithTrace

func WithTrace(enabled bool) Option

func WithUnique

func WithUnique(ttl time.Duration) Option

type Orchestrator

type Orchestrator struct {
	// contains filtered or unexported fields
}

func NewOrchestrator

func NewOrchestrator(opts ...OrchestratorOption) *Orchestrator

func (*Orchestrator) AddEventPublisher

func (o *Orchestrator) AddEventPublisher(publisher EventPublisher)

func (*Orchestrator) AddObserver

func (o *Orchestrator) AddObserver(observer Observer)

func (*Orchestrator) Execute

func (o *Orchestrator) Execute(ctx context.Context, exec RuntimeExecution) error

type OrchestratorOption

type OrchestratorOption func(*Orchestrator)

func WithEventPublisher

func WithEventPublisher(publisher EventPublisher) OrchestratorOption

func WithObserver

func WithObserver(observer Observer) OrchestratorOption

type Outbox

type Outbox interface {
	Save(ctx context.Context, task Task, opts ...Option) error
	Flush(ctx context.Context, limit int) error
}

type OutboxConfig

type OutboxConfig struct {
	Enabled       bool          `mapstructure:"enabled" yaml:"enabled"`
	FlushInterval time.Duration `mapstructure:"flush_interval" yaml:"flush_interval"`
	BatchSize     int           `mapstructure:"batch_size" yaml:"batch_size"`
}

type OutboxFactory

type OutboxFactory func(QueueRunner) (Outbox, error)

type OutboxPoller

type OutboxPoller struct {
	// contains filtered or unexported fields
}

func NewOutboxPoller

func NewOutboxPoller(outbox Outbox, cfg OutboxPollerConfig) *OutboxPoller

func (*OutboxPoller) Run

func (p *OutboxPoller) Run(ctx context.Context) error

func (*OutboxPoller) Start

type OutboxPollerConfig

type OutboxPollerConfig struct {
	BatchSize     int
	FlushInterval time.Duration
}

type OutboxTask

type OutboxTask struct {
	Task    Task
	Options []Option
}

func NewOutboxTask

func NewOutboxTask(task Task, opts ...Option) OutboxTask

type ProgressReporter

type ProgressReporter interface {
	Report(ctx context.Context, taskID string, percent int, message string) error
}

type QueueAction

type QueueAction struct {
	OperatorID      string
	OperatorName    string
	Action          string
	Queue           string
	TaskID          string
	TaskType        string
	BeforeState     TaskState
	AfterState      TaskState
	PayloadSnapshot string
	CreatedAt       time.Time
}

type QueueDrainer

type QueueDrainer interface {
	DrainQueue(ctx context.Context, queue string) error
}

type QueueInfo

type QueueInfo struct {
	Name     string
	State    QueueState
	Priority int

	Pending   int64
	Active    int64
	Scheduled int64
	Retry     int64
	Archived  int64
	Succeeded int64
	Failed    int64
	Canceled  int64

	Processed int64
	FailedAll int64

	PausedAt  *time.Time
	UpdatedAt time.Time
}

type QueueMetrics

type QueueMetrics struct {
	Pending   int64
	Active    int64
	Scheduled int64
	Retry     int64
	Archived  int64
	Succeeded int64
	Failed    int64
	Canceled  int64
	Processed int64
	FailedAll int64
}

type QueueRunner

type QueueRunner interface {
	Client
	Worker
	Manager
}

func New

func New(cfg Config, opts ...RuntimeOption) QueueRunner

func NewRunner

func NewRunner(cfg Config, opts ...RuntimeOption) (QueueRunner, error)

type QueueRuntimeStatus

type QueueRuntimeStatus struct {
	Name      string
	State     QueueState
	Priority  int
	Metrics   QueueMetrics
	PausedAt  *time.Time
	UpdatedAt time.Time
}

type QueueState

type QueueState string
const (
	QueueRunning  QueueState = "running"
	QueuePaused   QueueState = "paused"
	QueueDraining QueueState = "draining"
	QueueStopped  QueueState = "stopped"
	QueueFailed   QueueState = "failed"
	QueueUnknown  QueueState = "unknown"
)

type RateLimitConfig

type RateLimitConfig struct {
	Enabled       bool          `mapstructure:"enabled" yaml:"enabled"`
	DefaultLimit  int           `mapstructure:"default_limit" yaml:"default_limit"`
	DefaultWindow time.Duration `mapstructure:"default_window" yaml:"default_window"`
}

type RateLimitError

type RateLimitError struct {
	RetryIn time.Duration
	Err     error
}

func (*RateLimitError) Error

func (e *RateLimitError) Error() string

func (*RateLimitError) Unwrap

func (e *RateLimitError) Unwrap() error

type RateLimiter

type RateLimiter interface {
	Allow(ctx context.Context, key string, limit int, window time.Duration) (bool, time.Duration, error)
}

type RedisConfig

type RedisConfig struct {
	Addr     string `mapstructure:"addr" yaml:"addr"`
	Password string `mapstructure:"password" yaml:"password"`
	DB       int    `mapstructure:"db" yaml:"db"`
}

type Registration

type Registration struct {
	Pattern  string
	Handlers []any
}

func Register

func Register(pattern string, handlers ...any) Registration

type Registry

type Registry struct {
	// contains filtered or unexported fields
}

func NewRegistry

func NewRegistry(worker Worker) *Registry

func NewRegistryWithDispatcher

func NewRegistryWithDispatcher(worker Worker, dispatcher *Dispatcher) *Registry

func NewRegistryWithRuntime

func NewRegistryWithRuntime(worker Worker, runtime *RegistryRuntime) *Registry

func (*Registry) AddHook

func (r *Registry) AddHook(h Hook)

func (*Registry) Dispatcher

func (r *Registry) Dispatcher() *Dispatcher

func (*Registry) Entries

func (r *Registry) Entries() []HandlerMetadata

func (*Registry) Metadata

func (r *Registry) Metadata() RegistryMetadata

func (*Registry) Register

func (r *Registry) Register(pattern string, handlers ...any) error

func (*Registry) RegisterAll

func (r *Registry) RegisterAll(registrations ...Registration) error

func (*Registry) Runtime

func (r *Registry) Runtime() *RegistryRuntime

func (*Registry) Use

func (r *Registry) Use(middlewares ...Middleware)

func (*Registry) UseRuntime

func (r *Registry) UseRuntime(middlewares ...RuntimeMiddleware)

type RegistryMetadata

type RegistryMetadata struct {
	Handlers   []HandlerMetadata
	Middleware MiddlewareMetadata
}

type RegistryRuntime

type RegistryRuntime struct {
	// contains filtered or unexported fields
}

func NewRegistryRuntime

func NewRegistryRuntime(dispatcher *Dispatcher) *RegistryRuntime

func (*RegistryRuntime) AddHook

func (r *RegistryRuntime) AddHook(h Hook)

func (*RegistryRuntime) Dispatch

func (r *RegistryRuntime) Dispatch(ctx context.Context, pattern string, msg *Message) error

func (*RegistryRuntime) Dispatcher

func (r *RegistryRuntime) Dispatcher() *Dispatcher

func (*RegistryRuntime) Entries

func (r *RegistryRuntime) Entries() []HandlerMetadata

func (*RegistryRuntime) Handler

func (r *RegistryRuntime) Handler(pattern string) HandlerFunc

func (*RegistryRuntime) Metadata

func (r *RegistryRuntime) Metadata() RegistryMetadata

func (*RegistryRuntime) Register

func (r *RegistryRuntime) Register(pattern string, handlers ...any) error

func (*RegistryRuntime) SetOrchestrator

func (r *RegistryRuntime) SetOrchestrator(orchestrator *Orchestrator)

func (*RegistryRuntime) Use

func (r *RegistryRuntime) Use(middlewares ...Middleware)

func (*RegistryRuntime) UseRuntime

func (r *RegistryRuntime) UseRuntime(middlewares ...RuntimeMiddleware)

type RetryDelayFunc

type RetryDelayFunc func(retryCount int, err error, msg *Message) time.Duration

type RetryStrategy

type RetryStrategy interface {
	NextRetry(ctx context.Context, msg *Message, retryCount int, err error) (time.Duration, bool)
}

func RetryDelayStrategy

func RetryDelayStrategy(fn RetryDelayFunc) RetryStrategy

type RetryStrategyFunc

type RetryStrategyFunc func(ctx context.Context, msg *Message, retryCount int, err error) (time.Duration, bool)

func (RetryStrategyFunc) NextRetry

func (fn RetryStrategyFunc) NextRetry(ctx context.Context, msg *Message, retryCount int, err error) (time.Duration, bool)

type RunnerDriver

type RunnerDriver interface {
	Driver
	NewRunner(cfg Config, opts ...RuntimeOption) (QueueRunner, error)
}

type RuntimeContext

type RuntimeContext struct {
	TraceID   string
	WorkerID  string
	QueueName string
	TaskState TaskState
}

RuntimeContext 只保存 runtime metadata、runtime resources 标识和 runtime state。

不要把业务 payload、业务对象或外部 SDK 客户端放入 RuntimeContext。

func RuntimeContextFromContext

func RuntimeContextFromContext(ctx context.Context) (*RuntimeContext, bool)

func RuntimeContextFromMessage

func RuntimeContextFromMessage(msg *Message) *RuntimeContext

type RuntimeError

type RuntimeError struct {
	Kind      ErrorKind
	Code      string
	Retryable bool
	Fatal     bool
	Ignore    bool
	RetryIn   time.Duration
	Err       error
}

func RuntimeErrorFrom

func RuntimeErrorFrom(err error) (*RuntimeError, bool)

func (*RuntimeError) Error

func (e *RuntimeError) Error() string

func (*RuntimeError) Unwrap

func (e *RuntimeError) Unwrap() error

type RuntimeEvent

type RuntimeEvent struct {
	Type      RuntimeEventType
	Message   *Message
	Error     error
	Timestamp time.Time
}

type RuntimeEventType

type RuntimeEventType string
const (
	RuntimeEventTaskStarted    RuntimeEventType = "task.started"
	RuntimeEventTaskSuccess    RuntimeEventType = "task.success"
	RuntimeEventTaskFailed     RuntimeEventType = "task.failed"
	RuntimeEventTaskRetry      RuntimeEventType = "task.retry"
	RuntimeEventTaskDeadLetter RuntimeEventType = "task.deadletter"
	RuntimeEventTaskTimeout    RuntimeEventType = "task.timeout"
)

type RuntimeExecution

type RuntimeExecution struct {
	Pattern     string
	Handler     HandlerFunc
	Middlewares []RuntimeMiddleware
	Hooks       []Hook
	Message     *Message
	Metadata    HandlerMetadata
}

type RuntimeInstance

type RuntimeInstance struct {
	// contains filtered or unexported fields
}

func From

func From(source any) *RuntimeInstance

From resolves a runtime instance from runtime wiring objects. Business code should prefer Runtime(ctx), package helpers, or explicit Client/Manager values.

func InitRuntimeInstance

func InitRuntimeInstance(ctx context.Context, cfg Config, runtimeCfg RuntimeKernelConfig, opts ...RuntimeInstanceOption) (*RuntimeInstance, error)

func NewRuntimeInstance

func NewRuntimeInstance(runner QueueRunner, opts ...RuntimeInstanceOption) *RuntimeInstance

func NewRuntimeInstanceFromParts

func NewRuntimeInstanceFromParts(parts RuntimeParts, opts ...RuntimeInstanceOption) *RuntimeInstance

func Runtime

func Runtime(ctx context.Context) *RuntimeInstance

func (*RuntimeInstance) AddHook

func (rt *RuntimeInstance) AddHook(h Hook)

func (*RuntimeInstance) ArchiveTask

func (rt *RuntimeInstance) ArchiveTask(ctx context.Context, queueName string, taskID string) error

func (*RuntimeInstance) BatchEnqueue

func (rt *RuntimeInstance) BatchEnqueue(ctx context.Context, tasks []Task, opts ...Option) ([]*TaskInfo, error)

func (*RuntimeInstance) CancelTask

func (rt *RuntimeInstance) CancelTask(ctx context.Context, queueName string, taskID string) error

func (*RuntimeInstance) Capabilities

func (rt *RuntimeInstance) Capabilities() map[Capability]bool

func (*RuntimeInstance) Client

func (rt *RuntimeInstance) Client() Client

func (*RuntimeInstance) Close

func (rt *RuntimeInstance) Close() error

func (*RuntimeInstance) DeleteTask

func (rt *RuntimeInstance) DeleteTask(ctx context.Context, queueName string, taskID string) error

func (*RuntimeInstance) DispatchAutoRetryTasks

func (rt *RuntimeInstance) DispatchAutoRetryTasks(ctx context.Context, limit int) (int, error)

func (*RuntimeInstance) Enqueue

func (rt *RuntimeInstance) Enqueue(ctx context.Context, task Task, opts ...Option) (*TaskInfo, error)

func (*RuntimeInstance) GetQueue

func (rt *RuntimeInstance) GetQueue(ctx context.Context, queueName string) (*QueueInfo, error)

func (*RuntimeInstance) GetTask

func (rt *RuntimeInstance) GetTask(ctx context.Context, queueName string, taskID string) (*TaskInfo, error)

func (*RuntimeInstance) Handle

func (rt *RuntimeInstance) Handle(pattern string, handler HandlerFunc)

func (*RuntimeInstance) Kernel

func (rt *RuntimeInstance) Kernel() *RuntimeKernel

func (*RuntimeInstance) ListQueues

func (rt *RuntimeInstance) ListQueues(ctx context.Context) ([]*QueueInfo, error)

func (*RuntimeInstance) ListTasks

func (rt *RuntimeInstance) ListTasks(ctx context.Context, query TaskQuery) ([]*TaskInfo, error)

func (*RuntimeInstance) Manager

func (rt *RuntimeInstance) Manager() Manager

func (*RuntimeInstance) Metadata

func (rt *RuntimeInstance) Metadata() RuntimeMetadata

func (*RuntimeInstance) NewRegistry

func (rt *RuntimeInstance) NewRegistry() *Registry

func (*RuntimeInstance) Operations

func (rt *RuntimeInstance) Operations() *OperationsRuntime

func (*RuntimeInstance) PauseQueue

func (rt *RuntimeInstance) PauseQueue(ctx context.Context, queueName string) error

func (*RuntimeInstance) Registry

func (rt *RuntimeInstance) Registry() *RegistryRuntime

func (*RuntimeInstance) RequeueTaskRecord

func (rt *RuntimeInstance) RequeueTaskRecord(ctx context.Context, record TaskRecord) (*TaskInfo, error)

func (*RuntimeInstance) ResumeQueue

func (rt *RuntimeInstance) ResumeQueue(ctx context.Context, queueName string) error

func (*RuntimeInstance) RetryTask

func (rt *RuntimeInstance) RetryTask(ctx context.Context, queueName string, taskID string) error

func (*RuntimeInstance) Run

func (rt *RuntimeInstance) Run(ctx context.Context) error

func (*RuntimeInstance) Runner

func (rt *RuntimeInstance) Runner() QueueRunner

func (*RuntimeInstance) Shutdown

func (rt *RuntimeInstance) Shutdown(ctx context.Context) error

func (*RuntimeInstance) StartSchedulePoller

func (rt *RuntimeInstance) StartSchedulePoller(ctx context.Context, cfg ScheduleConfig) context.CancelFunc

func (*RuntimeInstance) Supports

func (rt *RuntimeInstance) Supports(cap Capability) bool

func (*RuntimeInstance) Use

func (rt *RuntimeInstance) Use(middlewares ...Middleware)

func (*RuntimeInstance) UseRuntime

func (rt *RuntimeInstance) UseRuntime(middlewares ...RuntimeMiddleware)

func (*RuntimeInstance) Worker

func (rt *RuntimeInstance) Worker() Worker

type RuntimeInstanceOption

type RuntimeInstanceOption func(*RuntimeInstance)

func WithRuntimeKernel

func WithRuntimeKernel(kernel *RuntimeKernel) RuntimeInstanceOption

func WithRuntimeMetadata

func WithRuntimeMetadata(metadata RuntimeMetadata) RuntimeInstanceOption

func WithRuntimeOperations

func WithRuntimeOperations(operations *OperationsRuntime) RuntimeInstanceOption

func WithRuntimeRegistry

func WithRuntimeRegistry(registry *RegistryRuntime) RuntimeInstanceOption

func WithRuntimeTaskStore

func WithRuntimeTaskStore(store TaskStore) RuntimeInstanceOption

type RuntimeKernel

type RuntimeKernel struct {
	// contains filtered or unexported fields
}

func InitRuntimeKernel

func InitRuntimeKernel(ctx context.Context, cfg Config, runtimeCfg RuntimeKernelConfig) (*RuntimeKernel, error)

func (*RuntimeKernel) Close

func (k *RuntimeKernel) Close()

func (*RuntimeKernel) Orchestrator

func (k *RuntimeKernel) Orchestrator() *Orchestrator

func (*RuntimeKernel) RateLimiter

func (k *RuntimeKernel) RateLimiter() (RateLimiter, RateLimitConfig, bool)

func (*RuntimeKernel) Runner

func (k *RuntimeKernel) Runner() QueueRunner

type RuntimeKernelConfig

type RuntimeKernelConfig struct {
	IsFailure IsFailureFunc

	RateLimiter     RateLimiter
	RateLimitConfig RateLimitConfig

	OutboxFactory OutboxFactory
	OutboxMigrate func(context.Context) error
	OutboxPoller  OutboxPollerConfig

	EventPublishers []EventPublisher
	Observers       []Observer
}

type RuntimeMetadata

type RuntimeMetadata struct {
	Name           string
	Service        string
	Driver         string
	Worker         string
	Queues         map[string]int
	DefaultQueue   string
	StrictPriority bool
	Concurrency    int
	RetryCount     int
	MaxRetry       *int
	Timeout        time.Duration
	Delay          time.Duration
	Priority       int
	Trace          TraceMetadata
	WorkerMetadata WorkerMetadata
	Middleware     MiddlewareMetadata
	RateLimit      RateLimitConfig
}

func RuntimeMetadataFromConfig

func RuntimeMetadataFromConfig(name string, service string, cfg Config) RuntimeMetadata

func RuntimeMetadataFromWorkerProfile

func RuntimeMetadataFromWorkerProfile(name string, service string, cfg Config, profile WorkerProfile) RuntimeMetadata

type RuntimeMetrics

type RuntimeMetrics struct {
	Queues map[string]QueueMetrics
	Total  QueueMetrics
}

type RuntimeMiddleware

type RuntimeMiddleware struct {
	Stage      MiddlewareStage
	Middleware Middleware
}

func BusinessMiddleware

func BusinessMiddleware(middleware Middleware) RuntimeMiddleware

func RuntimeMiddlewares

func RuntimeMiddlewares(stage MiddlewareStage, middlewares ...Middleware) []RuntimeMiddleware

func StageMiddleware

func StageMiddleware(stage MiddlewareStage, middleware Middleware) RuntimeMiddleware

type RuntimeOption

type RuntimeOption func(*RuntimeOptions)

func WithIsFailure

func WithIsFailure(fn IsFailureFunc) RuntimeOption

type RuntimeOptions

type RuntimeOptions struct {
	IsFailure IsFailureFunc
}

func ApplyRuntimeOptions

func ApplyRuntimeOptions(opts []RuntimeOption) RuntimeOptions

func DefaultRuntimeOptions

func DefaultRuntimeOptions() RuntimeOptions

type RuntimeParts

type RuntimeParts struct {
	Runner  QueueRunner
	Client  Client
	Worker  Worker
	Manager Manager
}

type RuntimeState

type RuntimeState string
const (
	RuntimeRunning  RuntimeState = "running"
	RuntimePaused   RuntimeState = "paused"
	RuntimeDraining RuntimeState = "draining"
	RuntimeStopped  RuntimeState = "stopped"
	RuntimeFailed   RuntimeState = "failed"
)

type RuntimeStatusInfo

type RuntimeStatusInfo struct {
	State     RuntimeState
	Queues    []QueueRuntimeStatus
	Worker    WorkerRuntimeStatus
	UpdatedAt time.Time
	Error     string
}

type ScheduleConfig

type ScheduleConfig struct {
	Enabled      bool          `mapstructure:"enabled" yaml:"enabled"`
	PollInterval time.Duration `mapstructure:"poll_interval" yaml:"poll_interval"`
	BatchSize    int           `mapstructure:"batch_size" yaml:"batch_size"`
}

type Task

type Task struct {
	ID      string
	Type    string
	Queue   string
	Payload any
	Headers map[string]string
}

func NewTask

func NewTask(taskType string, payload any) Task

type TaskAutoRetryStore

type TaskAutoRetryStore interface {
	ClaimAutoRetryTasks(ctx context.Context, now time.Time, limit int, workerID string, driver string) ([]TaskRecord, error)
	MarkAutoRetryFailed(ctx context.Context, record TaskRecord, err error) error
}

type TaskDeletionStore added in v0.1.1

type TaskDeletionStore interface {
	DeleteTaskRecord(ctx context.Context, queue string, taskID string) error
}

type TaskInfo

type TaskInfo struct {
	ID      string
	Type    string
	Queue   string
	State   TaskState
	Payload []byte
	Headers map[string]string

	MaxRetry  int
	Retried   int
	LastError string

	Timeout   time.Duration
	Deadline  *time.Time
	NextRunAt *time.Time
	LastRunAt *time.Time

	CreatedAt time.Time
	UpdatedAt time.Time

	TrackID   string
	RequestID string
	TraceID   string
	SpanID    string
	TenantID  string
	UserID    string

	Result string
}

func BatchEnqueue

func BatchEnqueue(ctx context.Context, tasks []Task, opts ...Option) ([]*TaskInfo, error)

func Delay

func Delay(ctx context.Context, taskType string, payload any, delay time.Duration, opts ...Option) (*TaskInfo, error)

func Enqueue

func Enqueue(ctx context.Context, task Task, opts ...Option) (*TaskInfo, error)

func Push

func Push(ctx context.Context, taskType string, payload any, opts ...Option) (*TaskInfo, error)

func RequeueTaskRecord added in v0.1.1

func RequeueTaskRecord(ctx context.Context, record TaskRecord) (*TaskInfo, error)

type TaskLogger

type TaskLogger interface {
	Info(message string, fields ...any)
	Warn(message string, fields ...any)
	Error(message string, fields ...any)
}

func TaskLoggerFromContext

func TaskLoggerFromContext(ctx context.Context) TaskLogger

type TaskMeta

type TaskMeta struct {
	TrackID   string `json:"track_id"`
	RequestID string `json:"request_id"`
	TraceID   string `json:"trace_id"`
	SpanID    string `json:"span_id"`
	TenantID  string `json:"tenant_id"`
	UserID    string `json:"user_id"`
}

type TaskPayload

type TaskPayload struct {
	Version int             `json:"version"`
	Data    json.RawMessage `json:"data"`
}

type TaskQuery

type TaskQuery struct {
	Queue  string
	State  TaskState
	Type   string
	TaskID string

	Limit  int
	Offset int
	Cursor string
}

type TaskRecord

type TaskRecord struct {
	RecordID              int64
	Driver                string
	TaskID                string
	Queue                 string
	Type                  string
	Payload               []byte
	State                 TaskState
	MaxRetry              int
	Attempts              int
	TimeoutSeconds        int
	UniqueSeconds         int
	DelaySeconds          int
	AutoRetryEnabled      bool
	AutoRetryCount        int
	AutoRetryMax          int
	AutoRetryDelaySeconds int
	NextRetryAt           *time.Time
	LastRetryError        string
	ScheduledAt           *time.Time
	StartedAt             *time.Time
	FinishedAt            *time.Time
	LastError             string
	WorkerID              string
	TrackID               string
	RequestID             string
	TraceID               string
	SpanID                string
	CreatedAt             time.Time
	UpdatedAt             time.Time
	Headers               map[string]string
}

func RecordTaskScheduled

func RecordTaskScheduled(ctx context.Context, store TaskStore, record TaskRecord) (TaskRecord, error)

func RecordTaskSubmitting

func RecordTaskSubmitting(ctx context.Context, store TaskStore, record TaskRecord) (TaskRecord, error)

type TaskRunLogRecord

type TaskRunLogRecord struct {
	QueueTaskID    int64
	QueueTaskRunID int64
	RunID          string
	TaskID         string
	Level          string
	Message        string
	Fields         map[string]any
	TrackID        string
	RequestID      string
	TraceID        string
	SpanID         string
	At             time.Time
	Headers        map[string]string
}

type TaskRunRecord

type TaskRunRecord struct {
	RunDBID     int64
	RunID       string
	QueueTaskID int64
	TaskID      string
	Queue       string
	Type        string
	Attempt     int
	Status      TaskState
	WorkerID    string
	StartedAt   time.Time
	FinishedAt  time.Time
	DurationMS  int64
	Error       string
	TrackID     string
	RequestID   string
	TraceID     string
	SpanID      string
	Headers     map[string]string
}

type TaskScheduleStore

type TaskScheduleStore interface {
	RecordScheduled(ctx context.Context, record TaskRecord) (TaskRecord, error)
	ClaimScheduled(ctx context.Context, now time.Time, limit int, workerID string, driver string) ([]TaskRecord, error)
}

type TaskState

type TaskState string
const (
	TaskPending    TaskState = "pending"
	TaskRunning    TaskState = "running"
	TaskRetrying   TaskState = "retrying"
	TaskFailed     TaskState = "failed"
	TaskDeadLetter TaskState = "deadletter"
	TaskSuccess    TaskState = "success"

	StatePending      TaskState = "pending"
	StateSubmitting   TaskState = "submitting"
	StateSubmitFailed TaskState = "submit_failed"
	StateActive       TaskState = "active"
	StateScheduled    TaskState = "scheduled"
	StateRetry        TaskState = "retry"
	StateSucceeded    TaskState = "succeeded"
	StateFailed       TaskState = "failed"
	StateArchived     TaskState = "archived"
	StateCanceled     TaskState = "canceled"
	StateUnknown      TaskState = "unknown"
)

type TaskStatusUpdate

type TaskStatusUpdate struct {
	Queue     string
	TaskID    string
	Status    TaskState
	LastError string
	UpdatedAt time.Time
}

type TaskStore

type TaskStore interface {
	RecordEnqueued(ctx context.Context, record TaskRecord) error
	EnsureRunning(ctx context.Context, record TaskRecord) (TaskRecord, error)
	StartRun(ctx context.Context, run TaskRunRecord) (TaskRunRecord, error)
	FinishRun(ctx context.Context, run TaskRunRecord) error
	AppendRunLog(ctx context.Context, log TaskRunLogRecord) error
	UpdateTaskStatus(ctx context.Context, update TaskStatusUpdate) error
}

type TaskStoreOptions

type TaskStoreOptions struct {
	Driver   string
	WorkerID string
	RunID    func() string
	Now      func() time.Time
}

type TaskSubmissionStore

type TaskSubmissionStore interface {
	RecordSubmitting(ctx context.Context, record TaskRecord) (TaskRecord, error)
	RecordDispatched(ctx context.Context, record TaskRecord) error
	RecordDispatchFailed(ctx context.Context, record TaskRecord) error
}

type TraceMetadata

type TraceMetadata struct {
	Enabled   bool
	TrackID   string
	RequestID string
	TraceID   string
	SpanID    string
}

type Worker

type Worker interface {
	Handle(taskType string, handler HandlerFunc)
	Use(middlewares ...Middleware)
	Run(ctx context.Context) error
	Shutdown(ctx context.Context) error
}

type WorkerHeartbeat

type WorkerHeartbeat struct {
	WorkerName      string
	Hostname        string
	PID             int
	Queues          map[string]int
	Concurrency     int
	StartedAt       time.Time
	LastHeartbeatAt time.Time
}

type WorkerMetadata

type WorkerMetadata struct {
	ID             string
	Name           string
	Service        string
	Queues         map[string]int
	StrictPriority bool
	Concurrency    int
}

type WorkerProfile

type WorkerProfile struct {
	Name            string         `mapstructure:"name" yaml:"name"`
	Concurrency     int            `mapstructure:"concurrency" yaml:"concurrency"`
	Queues          map[string]int `mapstructure:"queues" yaml:"queues"`
	StrictPriority  bool           `mapstructure:"strict_priority" yaml:"strict_priority"`
	ShutdownTimeout time.Duration  `mapstructure:"shutdown_timeout" yaml:"shutdown_timeout"`
}

func (WorkerProfile) Normalize

func (p WorkerProfile) Normalize(name string, cfg Config) WorkerProfile

type WorkerRuntimeStatus

type WorkerRuntimeStatus struct {
	ID             string
	Name           string
	Service        string
	State          RuntimeState
	Queues         map[string]int
	StrictPriority bool
	Concurrency    int
	UpdatedAt      time.Time
}

Directories

Path Synopsis
facade
runtime

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL