Documentation
¶
Index ¶
- Constants
- Variables
- func CanTransitionTaskState(from TaskState, to TaskState) bool
- func CloneCapabilities(in map[Capability]bool) map[Capability]bool
- func ContextFromCorrelationHeaders(ctx context.Context, headers map[string]string) context.Context
- func ContextWithMessage(ctx context.Context, msg *Message) context.Context
- func ContextWithMetadata(ctx context.Context, metadata ContextMetadata) context.Context
- func ContextWithRuntime(ctx context.Context, runtime *RuntimeInstance) context.Context
- func ContextWithRuntimeContext(ctx context.Context, runtime *RuntimeContext) context.Context
- func CorrelationHeaderValue(headers map[string]string, key string) string
- func CorrelationHeadersFromContext(ctx context.Context) map[string]string
- func DecodePayload[T any](msg *Message) (T, error)
- func DefaultRetryDelay(retryCount int, err error, _ *Message) time.Duration
- func DispatchAutoRetryTasks(ctx context.Context, limit int) (int, error)
- func EnsureMessageRuntimeContext(ctx context.Context, msg *Message) context.Context
- func IsDeadLetterError(err error) bool
- func IsFatalError(err error) bool
- func IsIgnoredError(err error) bool
- func IsLockNotAcquired(err error) bool
- func IsRateLimitError(err error) bool
- func IsRetryableError(err error) bool
- func IsTimeoutError(err error) bool
- func MarshalPayload(payload any) ([]byte, error)
- func MessageMetadataDuration(msg *Message, key string) (time.Duration, bool)
- func MessageMetadataString(msg *Message, key string) (string, bool)
- func MessageMetadataValue(msg *Message, key string) (any, bool)
- func NewDeadLetterError(err error) error
- func NewFatalError(err error) error
- func NewIgnoredError(err error) error
- func NewRetryableError(err error) error
- func NewRuntimeError(code string, err error) error
- func NewTimeoutError(err error) error
- func RateLimited(retryIn time.Duration, err error) error
- func RecordTaskDispatchFailed(ctx context.Context, store TaskStore, record TaskRecord) error
- func RecordTaskDispatched(ctx context.Context, store TaskStore, record TaskRecord) error
- func RecordTaskEnqueued(ctx context.Context, store TaskStore, record TaskRecord) error
- func RegisterDriver(driver RunnerDriver) error
- func RequestIDFromHeaders(headers map[string]string) string
- func RetryableAfter(retryIn time.Duration, err error) error
- func SetMessageMetadata(msg *Message, key string, value any)
- func SetSpanCorrelationAttributes(ctx context.Context, span oteltrace.Span)
- func SetTaskState(msg *Message, state TaskState)
- func SpanIDFromHeaders(headers map[string]string) string
- func TraceIDFromHeaders(headers map[string]string) string
- func TrackIDFromHeaders(headers map[string]string) string
- func TransitionTaskState(msg *Message, next TaskState) bool
- func UpdateStoredTaskStatus(ctx context.Context, store TaskStore, update TaskStatusUpdate) error
- func WithTaskLogger(ctx context.Context, logger TaskLogger) context.Context
- type ActionLogger
- type BackoffStrategy
- type Capability
- type Client
- type ConcurrencyLimiter
- type Config
- type ContextHandler
- type ContextMetadata
- type DeadLetter
- type Dispatcher
- func (d *Dispatcher) AddHook(h Hook)
- func (d *Dispatcher) Dispatch(ctx context.Context, pattern string, msg *Message) error
- func (d *Dispatcher) Entries() []HandlerMetadata
- func (d *Dispatcher) Handler(pattern string) HandlerFunc
- func (d *Dispatcher) HandlerFor(pattern string) (HandlerFunc, bool)
- func (d *Dispatcher) Metadata() RegistryMetadata
- func (d *Dispatcher) Orchestrator() *Orchestrator
- func (d *Dispatcher) Register(pattern string, handlers ...any) error
- func (d *Dispatcher) SetOrchestrator(orchestrator *Orchestrator)
- func (d *Dispatcher) Use(middlewares ...Middleware)
- func (d *Dispatcher) UseRuntime(middlewares ...RuntimeMiddleware)
- type Driver
- type EnqueueOptions
- type ErrorKind
- type EventPublisher
- type HandlerContext
- type HandlerFunc
- type HandlerMetadata
- type Hook
- type HookFunc
- type Idempotency
- type IdempotencyConfig
- type IsFailureFunc
- type LockConfig
- type Locker
- type Manager
- type Message
- type MetricsLabels
- type MetricsRecorder
- type Middleware
- type MiddlewareMetadata
- type MiddlewareStage
- type NATSConfig
- type NoopTaskLogger
- type Observer
- type ObserverFunc
- func (o ObserverFunc) OnTaskFailure(ctx context.Context, msg *Message, err error)
- func (o ObserverFunc) OnTaskFinish(ctx context.Context, msg *Message, err error)
- func (o ObserverFunc) OnTaskRetry(ctx context.Context, msg *Message, err error)
- func (o ObserverFunc) OnTaskStart(ctx context.Context, msg *Message)
- type OperationsRuntime
- func (o *OperationsRuntime) ArchiveTask(ctx context.Context, queueName string, taskID string) error
- func (o *OperationsRuntime) CancelTask(ctx context.Context, queueName string, taskID string) error
- func (o *OperationsRuntime) Capabilities() map[Capability]bool
- func (o *OperationsRuntime) CleanTasks(ctx context.Context, query TaskQuery) (int, error)
- func (o *OperationsRuntime) DeleteTask(ctx context.Context, queueName string, taskID string) error
- func (o *OperationsRuntime) Drain(ctx context.Context) error
- func (o *OperationsRuntime) DrainQueue(ctx context.Context, queueName string) error
- func (o *OperationsRuntime) GetQueue(ctx context.Context, queueName string) (*QueueInfo, error)
- func (o *OperationsRuntime) GetTask(ctx context.Context, queueName string, taskID string) (*TaskInfo, error)
- func (o *OperationsRuntime) ListFailedTasks(ctx context.Context, query TaskQuery) ([]*TaskInfo, error)
- func (o *OperationsRuntime) ListQueues(ctx context.Context) ([]*QueueInfo, error)
- func (o *OperationsRuntime) ListTasks(ctx context.Context, query TaskQuery) ([]*TaskInfo, error)
- func (o *OperationsRuntime) Manager() Manager
- func (o *OperationsRuntime) Metadata() RuntimeMetadata
- func (o *OperationsRuntime) Metrics(ctx context.Context) (*RuntimeMetrics, error)
- func (o *OperationsRuntime) PauseQueue(ctx context.Context, queueName string) error
- func (o *OperationsRuntime) QueueStatus(ctx context.Context) ([]QueueRuntimeStatus, error)
- func (o *OperationsRuntime) ResumeQueue(ctx context.Context, queueName string) error
- func (o *OperationsRuntime) RetryTask(ctx context.Context, queueName string, taskID string) error
- func (o *OperationsRuntime) RuntimeStatus(ctx context.Context) (*RuntimeStatusInfo, error)
- func (o *OperationsRuntime) SetManager(manager Manager)
- func (o *OperationsRuntime) SetMetadata(metadata RuntimeMetadata)
- func (o *OperationsRuntime) SetTaskStore(store TaskStore)
- func (o *OperationsRuntime) Status(ctx context.Context) ([]*QueueInfo, error)
- func (o *OperationsRuntime) Supports(cap Capability) bool
- func (o *OperationsRuntime) TaskStore() TaskStore
- func (o *OperationsRuntime) WorkerStatus(ctx context.Context) WorkerRuntimeStatus
- type Option
- func AutoRetry(max int, delay time.Duration) Option
- func Deadline(t time.Time) Option
- func Group(name string) Option
- func MaxRetry(n int) Option
- func ProcessAt(t time.Time) Option
- func ProcessIn(d time.Duration) Option
- func Queue(name string) Option
- func Retention(d time.Duration) Option
- func TaskID(id string) Option
- func Timeout(d time.Duration) Option
- func Unique(ttl time.Duration) Option
- func WithAutoRetry(max int, delay time.Duration) Option
- func WithDeadline(t time.Time) Option
- func WithDelay(d time.Duration) Option
- func WithGroup(name string) Option
- func WithMaxRetry(n int) Option
- func WithPriority(priority int) Option
- func WithProcessAt(t time.Time) Option
- func WithProcessIn(d time.Duration) Option
- func WithQueue(name string) Option
- func WithRateLimitKey(key string) Option
- func WithRetention(d time.Duration) Option
- func WithRetry(n int) Option
- func WithTaskID(id string) Option
- func WithTimeout(d time.Duration) Option
- func WithTrace(enabled bool) Option
- func WithUnique(ttl time.Duration) Option
- type Orchestrator
- type OrchestratorOption
- type Outbox
- type OutboxConfig
- type OutboxFactory
- type OutboxPoller
- type OutboxPollerConfig
- type OutboxTask
- type ProgressReporter
- type QueueAction
- type QueueDrainer
- type QueueInfo
- type QueueMetrics
- type QueueRunner
- type QueueRuntimeStatus
- type QueueState
- type RateLimitConfig
- type RateLimitError
- type RateLimiter
- type RedisConfig
- type Registration
- type Registry
- func (r *Registry) AddHook(h Hook)
- func (r *Registry) Dispatcher() *Dispatcher
- func (r *Registry) Entries() []HandlerMetadata
- func (r *Registry) Metadata() RegistryMetadata
- func (r *Registry) Register(pattern string, handlers ...any) error
- func (r *Registry) RegisterAll(registrations ...Registration) error
- func (r *Registry) Runtime() *RegistryRuntime
- func (r *Registry) Use(middlewares ...Middleware)
- func (r *Registry) UseRuntime(middlewares ...RuntimeMiddleware)
- type RegistryMetadata
- type RegistryRuntime
- func (r *RegistryRuntime) AddHook(h Hook)
- func (r *RegistryRuntime) Dispatch(ctx context.Context, pattern string, msg *Message) error
- func (r *RegistryRuntime) Dispatcher() *Dispatcher
- func (r *RegistryRuntime) Entries() []HandlerMetadata
- func (r *RegistryRuntime) Handler(pattern string) HandlerFunc
- func (r *RegistryRuntime) Metadata() RegistryMetadata
- func (r *RegistryRuntime) Register(pattern string, handlers ...any) error
- func (r *RegistryRuntime) SetOrchestrator(orchestrator *Orchestrator)
- func (r *RegistryRuntime) Use(middlewares ...Middleware)
- func (r *RegistryRuntime) UseRuntime(middlewares ...RuntimeMiddleware)
- type RetryDelayFunc
- type RetryStrategy
- type RetryStrategyFunc
- type RunnerDriver
- type RuntimeContext
- type RuntimeError
- type RuntimeEvent
- type RuntimeEventType
- type RuntimeExecution
- type RuntimeInstance
- func From(source any) *RuntimeInstance
- func InitRuntimeInstance(ctx context.Context, cfg Config, runtimeCfg RuntimeKernelConfig, ...) (*RuntimeInstance, error)
- func NewRuntimeInstance(runner QueueRunner, opts ...RuntimeInstanceOption) *RuntimeInstance
- func NewRuntimeInstanceFromParts(parts RuntimeParts, opts ...RuntimeInstanceOption) *RuntimeInstance
- func Runtime(ctx context.Context) *RuntimeInstance
- func (rt *RuntimeInstance) AddHook(h Hook)
- func (rt *RuntimeInstance) ArchiveTask(ctx context.Context, queueName string, taskID string) error
- func (rt *RuntimeInstance) BatchEnqueue(ctx context.Context, tasks []Task, opts ...Option) ([]*TaskInfo, error)
- func (rt *RuntimeInstance) CancelTask(ctx context.Context, queueName string, taskID string) error
- func (rt *RuntimeInstance) Capabilities() map[Capability]bool
- func (rt *RuntimeInstance) Client() Client
- func (rt *RuntimeInstance) Close() error
- func (rt *RuntimeInstance) DeleteTask(ctx context.Context, queueName string, taskID string) error
- func (rt *RuntimeInstance) DispatchAutoRetryTasks(ctx context.Context, limit int) (int, error)
- func (rt *RuntimeInstance) Enqueue(ctx context.Context, task Task, opts ...Option) (*TaskInfo, error)
- func (rt *RuntimeInstance) GetQueue(ctx context.Context, queueName string) (*QueueInfo, error)
- func (rt *RuntimeInstance) GetTask(ctx context.Context, queueName string, taskID string) (*TaskInfo, error)
- func (rt *RuntimeInstance) Handle(pattern string, handler HandlerFunc)
- func (rt *RuntimeInstance) Kernel() *RuntimeKernel
- func (rt *RuntimeInstance) ListQueues(ctx context.Context) ([]*QueueInfo, error)
- func (rt *RuntimeInstance) ListTasks(ctx context.Context, query TaskQuery) ([]*TaskInfo, error)
- func (rt *RuntimeInstance) Manager() Manager
- func (rt *RuntimeInstance) Metadata() RuntimeMetadata
- func (rt *RuntimeInstance) NewRegistry() *Registry
- func (rt *RuntimeInstance) Operations() *OperationsRuntime
- func (rt *RuntimeInstance) PauseQueue(ctx context.Context, queueName string) error
- func (rt *RuntimeInstance) Registry() *RegistryRuntime
- func (rt *RuntimeInstance) RequeueTaskRecord(ctx context.Context, record TaskRecord) (*TaskInfo, error)
- func (rt *RuntimeInstance) ResumeQueue(ctx context.Context, queueName string) error
- func (rt *RuntimeInstance) RetryTask(ctx context.Context, queueName string, taskID string) error
- func (rt *RuntimeInstance) Run(ctx context.Context) error
- func (rt *RuntimeInstance) Runner() QueueRunner
- func (rt *RuntimeInstance) Shutdown(ctx context.Context) error
- func (rt *RuntimeInstance) StartSchedulePoller(ctx context.Context, cfg ScheduleConfig) context.CancelFunc
- func (rt *RuntimeInstance) Supports(cap Capability) bool
- func (rt *RuntimeInstance) Use(middlewares ...Middleware)
- func (rt *RuntimeInstance) UseRuntime(middlewares ...RuntimeMiddleware)
- func (rt *RuntimeInstance) Worker() Worker
- type RuntimeInstanceOption
- func WithRuntimeKernel(kernel *RuntimeKernel) RuntimeInstanceOption
- func WithRuntimeMetadata(metadata RuntimeMetadata) RuntimeInstanceOption
- func WithRuntimeOperations(operations *OperationsRuntime) RuntimeInstanceOption
- func WithRuntimeRegistry(registry *RegistryRuntime) RuntimeInstanceOption
- func WithRuntimeTaskStore(store TaskStore) RuntimeInstanceOption
- type RuntimeKernel
- type RuntimeKernelConfig
- type RuntimeMetadata
- type RuntimeMetrics
- type RuntimeMiddleware
- type RuntimeOption
- type RuntimeOptions
- type RuntimeParts
- type RuntimeState
- type RuntimeStatusInfo
- type ScheduleConfig
- type Task
- type TaskAutoRetryStore
- type TaskDeletionStore
- type TaskInfo
- func BatchEnqueue(ctx context.Context, tasks []Task, opts ...Option) ([]*TaskInfo, error)
- func Delay(ctx context.Context, taskType string, payload any, delay time.Duration, ...) (*TaskInfo, error)
- func Enqueue(ctx context.Context, task Task, opts ...Option) (*TaskInfo, error)
- func Push(ctx context.Context, taskType string, payload any, opts ...Option) (*TaskInfo, error)
- func RequeueTaskRecord(ctx context.Context, record TaskRecord) (*TaskInfo, error)
- type TaskLogger
- type TaskMeta
- type TaskPayload
- type TaskQuery
- type TaskRecord
- type TaskRunLogRecord
- type TaskRunRecord
- type TaskScheduleStore
- type TaskState
- type TaskStatusUpdate
- type TaskStore
- type TaskStoreOptions
- type TaskSubmissionStore
- type TraceMetadata
- type Worker
- type WorkerHeartbeat
- type WorkerMetadata
- type WorkerProfile
- type WorkerRuntimeStatus
Constants ¶
View Source
const ( MessageMetadataPattern = "queue.pattern" MessageMetadataQueue = "queue.queue" MessageMetadataMaxRetry = "queue.max_retry" MessageMetadataTimeout = "queue.timeout" MessageMetadataDelay = "queue.delay" MessageMetadataPriority = "queue.priority" MessageMetadataTrace = "queue.trace" MessageMetadataLockKey = "queue.lock.key" MessageMetadataLockTTL = "queue.lock.ttl" MessageMetadataWorker = "queue.worker" MessageMetadataConcurrencyKey = "queue.concurrency.key" )
View Source
const ( TaskLogLevelInfo = "info" TaskLogLevelWarn = "warn" TaskLogLevelError = "error" )
View Source
const DefaultQueueName = "default"
View Source
const RuntimeCapabilityName = "queue.runtime"
Variables ¶
View Source
var ( ErrNotInitialized = errors.New("queue: not initialized") ErrCapabilityUnsupported = errors.New("queue capability unsupported") ErrTaskDuplicated = errors.New("queue task duplicated") ErrTaskNotFound = errors.New("queue task not found") ErrQueueNotFound = errors.New("queue not found") ErrQueuePaused = errors.New("queue paused") ErrRateLimited = errors.New("queue rate limited") ErrTaskCanceled = errors.New("queue task canceled") ErrInvalidPayload = errors.New("queue invalid payload") ErrHandlerNotFound = errors.New("queue handler not found") ErrLockNotAcquired = errors.New("queue lock not acquired") ErrIdempotentDone = errors.New("queue idempotent done") )
Functions ¶
func CanTransitionTaskState ¶
func CloneCapabilities ¶
func CloneCapabilities(in map[Capability]bool) map[Capability]bool
func ContextWithMessage ¶
func ContextWithMetadata ¶
func ContextWithMetadata(ctx context.Context, metadata ContextMetadata) context.Context
func ContextWithRuntime ¶
func ContextWithRuntime(ctx context.Context, runtime *RuntimeInstance) context.Context
func ContextWithRuntimeContext ¶
func ContextWithRuntimeContext(ctx context.Context, runtime *RuntimeContext) context.Context
func CorrelationHeaderValue ¶
func DecodePayload ¶
func DefaultRetryDelay ¶
func DispatchAutoRetryTasks ¶ added in v0.1.1
func IsDeadLetterError ¶
func IsFatalError ¶
func IsIgnoredError ¶
func IsLockNotAcquired ¶
func IsRateLimitError ¶
func IsRetryableError ¶
func IsTimeoutError ¶
func MarshalPayload ¶
func MessageMetadataDuration ¶
func NewDeadLetterError ¶
func NewFatalError ¶
func NewIgnoredError ¶
func NewRetryableError ¶
func NewRuntimeError ¶
func NewTimeoutError ¶
func RecordTaskDispatchFailed ¶
func RecordTaskDispatchFailed(ctx context.Context, store TaskStore, record TaskRecord) error
func RecordTaskDispatched ¶
func RecordTaskDispatched(ctx context.Context, store TaskStore, record TaskRecord) error
func RecordTaskEnqueued ¶
func RecordTaskEnqueued(ctx context.Context, store TaskStore, record TaskRecord) error
func RegisterDriver ¶
func RegisterDriver(driver RunnerDriver) error
func RequestIDFromHeaders ¶
func SetMessageMetadata ¶
func SpanIDFromHeaders ¶
func TraceIDFromHeaders ¶
func TrackIDFromHeaders ¶
func TransitionTaskState ¶
func UpdateStoredTaskStatus ¶
func UpdateStoredTaskStatus(ctx context.Context, store TaskStore, update TaskStatusUpdate) error
func WithTaskLogger ¶
func WithTaskLogger(ctx context.Context, logger TaskLogger) context.Context
Types ¶
type ActionLogger ¶
type ActionLogger interface {
LogQueueAction(ctx context.Context, action QueueAction) error
}
type BackoffStrategy ¶
type Capability ¶
type Capability string
const ( CapEnqueue Capability = "enqueue" CapConsume Capability = "consume" CapRetry Capability = "retry" CapTimeout Capability = "timeout" CapDeadline Capability = "deadline" CapDelay Capability = "delay" CapUnique Capability = "unique" CapPriority Capability = "priority" CapRateLimit Capability = "rate_limit" CapCancel Capability = "cancel" CapPauseResume Capability = "pause_resume" CapDLQ Capability = "dlq" CapInspector Capability = "inspector" CapBatch Capability = "batch" CapChain Capability = "chain" CapProgress Capability = "progress" CapHeartbeat Capability = "heartbeat" CapLock Capability = "lock" CapIdempotency Capability = "idempotency" CapMetrics Capability = "metrics" CapLog Capability = "log" CapTrace Capability = "trace" CapActionLog Capability = "action_log" CapOutbox Capability = "outbox" )
type Client ¶
type ConcurrencyLimiter ¶
type Config ¶
type Config struct {
Driver string `mapstructure:"driver" yaml:"driver"`
Redis RedisConfig `mapstructure:"redis" yaml:"redis"`
NATS NATSConfig `mapstructure:"nats" yaml:"nats"`
Addr string `mapstructure:"addr" yaml:"addr"`
Password string `mapstructure:"password" yaml:"password"`
DB int `mapstructure:"db" yaml:"db"`
Concurrency int `mapstructure:"concurrency" yaml:"concurrency"`
Queues map[string]int `mapstructure:"queues" yaml:"queues"`
StrictPriority bool `mapstructure:"strict_priority" yaml:"strict_priority"`
Workers map[string]WorkerProfile `mapstructure:"workers" yaml:"workers"`
RateLimit RateLimitConfig `mapstructure:"rate_limit" yaml:"rate_limit"`
Lock LockConfig `mapstructure:"lock" yaml:"lock"`
Idempotency IdempotencyConfig `mapstructure:"idempotency" yaml:"idempotency"`
Outbox OutboxConfig `mapstructure:"outbox" yaml:"outbox"`
Schedule ScheduleConfig `mapstructure:"schedule" yaml:"schedule"`
}
func FromConfig ¶
func (Config) WorkerProfile ¶
func (c Config) WorkerProfile(name string) WorkerProfile
type ContextHandler ¶
type ContextHandler func(*HandlerContext) error
type ContextMetadata ¶
type ContextMetadata struct {
TaskID string
TaskType string
Queue string
TaskState TaskState
RetryCount int
MaxRetry int
WorkerID string
TrackID string
RequestID string
TraceID string
SpanID string
TenantID string
UserID string
Event map[string]string
}
func ContextMetadataFromMessage ¶
func ContextMetadataFromMessage(msg *Message) ContextMetadata
func MetadataFromContext ¶
func MetadataFromContext(ctx context.Context) (ContextMetadata, bool)
type DeadLetter ¶
type Dispatcher ¶
type Dispatcher struct {
// contains filtered or unexported fields
}
func NewDispatcher ¶
func NewDispatcher() *Dispatcher
func (*Dispatcher) AddHook ¶
func (d *Dispatcher) AddHook(h Hook)
func (*Dispatcher) Entries ¶
func (d *Dispatcher) Entries() []HandlerMetadata
func (*Dispatcher) Handler ¶
func (d *Dispatcher) Handler(pattern string) HandlerFunc
func (*Dispatcher) HandlerFor ¶
func (d *Dispatcher) HandlerFor(pattern string) (HandlerFunc, bool)
func (*Dispatcher) Metadata ¶
func (d *Dispatcher) Metadata() RegistryMetadata
func (*Dispatcher) Orchestrator ¶
func (d *Dispatcher) Orchestrator() *Orchestrator
func (*Dispatcher) SetOrchestrator ¶
func (d *Dispatcher) SetOrchestrator(orchestrator *Orchestrator)
func (*Dispatcher) Use ¶
func (d *Dispatcher) Use(middlewares ...Middleware)
func (*Dispatcher) UseRuntime ¶
func (d *Dispatcher) UseRuntime(middlewares ...RuntimeMiddleware)
type Driver ¶
type Driver interface {
Name() string
Capabilities() map[Capability]bool
Supports(cap Capability) bool
NewClient(cfg Config) (Client, error)
NewWorker(cfg Config, profile WorkerProfile) (Worker, error)
NewManager(cfg Config) (Manager, error)
}
type EnqueueOptions ¶
type EnqueueOptions struct {
Queue string
TaskID string
MaxRetry *int
Timeout time.Duration
Deadline time.Time
ProcessAt time.Time
ProcessIn time.Duration
UniqueTTL time.Duration
Retention time.Duration
Group string
Priority int
RateLimitKey string
Trace bool
AutoRetryEnabled bool
AutoRetryMax int
AutoRetryDelay time.Duration
}
func ApplyOptions ¶
func ApplyOptions(opts []Option) EnqueueOptions
type EventPublisher ¶
type EventPublisher interface {
Publish(ctx context.Context, event RuntimeEvent)
}
type HandlerContext ¶
type HandlerContext struct {
Message *Message
// contains filtered or unexported fields
}
func (*HandlerContext) Context ¶
func (c *HandlerContext) Context() context.Context
func (*HandlerContext) Next ¶
func (c *HandlerContext) Next() error
func (*HandlerContext) SetContext ¶
func (c *HandlerContext) SetContext(ctx context.Context)
type HandlerFunc ¶
func BuildHandlerChain ¶
func BuildHandlerChain(handlers ...any) (HandlerFunc, error)
func Chain ¶
func Chain(handler HandlerFunc, middlewares ...Middleware) HandlerFunc
func ChainRuntime ¶
func ChainRuntime(handler HandlerFunc, middlewares ...RuntimeMiddleware) HandlerFunc
type HandlerMetadata ¶
type HookFunc ¶
type HookFunc struct {
Before func(context.Context, *Message) error
After func(context.Context, *Message, error)
Success func(context.Context, *Message)
Failure func(context.Context, *Message, error)
}
func (HookFunc) AfterProcess ¶
func (HookFunc) BeforeProcess ¶
type Idempotency ¶
type IdempotencyConfig ¶
type IsFailureFunc ¶
type LockConfig ¶
type Manager ¶
type Manager interface {
Supports(cap Capability) bool
Capabilities() map[Capability]bool
ListQueues(ctx context.Context) ([]*QueueInfo, error)
GetQueue(ctx context.Context, queue string) (*QueueInfo, error)
ListTasks(ctx context.Context, query TaskQuery) ([]*TaskInfo, error)
GetTask(ctx context.Context, queue, taskID string) (*TaskInfo, error)
DeleteTask(ctx context.Context, queue, taskID string) error
RetryTask(ctx context.Context, queue, taskID string) error
ArchiveTask(ctx context.Context, queue, taskID string) error
CancelTask(ctx context.Context, queue, taskID string) error
PauseQueue(ctx context.Context, queue string) error
ResumeQueue(ctx context.Context, queue string) error
}
func NewManager ¶
type Message ¶
type MetricsLabels ¶
type MetricsRecorder ¶
type Middleware ¶
type Middleware func(HandlerFunc) HandlerFunc
func ContextChain ¶
func ContextChain(handlers ...ContextHandler) Middleware
func TaskStoreMiddleware ¶
func TaskStoreMiddleware(store TaskStore, opts TaskStoreOptions) Middleware
type MiddlewareMetadata ¶
type MiddlewareStage ¶
type MiddlewareStage int
const ( // MiddlewareStage 已冻结为 runtime governance 的固定顺序,maintenance mode 下不新增 stage。 // // 新治理能力应优先归入已有 stage,只有改变执行语义时才考虑调整 runtime kernel。 // RecoverStage 用于捕获后续 middleware 或 handler 的 panic,并转换为 error。 RecoverStage MiddlewareStage = iota // TraceStage 用于在观测和业务逻辑执行前创建或恢复 tracing 上下文。 TraceStage // MetricsStage 用于统计任务次数、成功失败和执行耗时。 MetricsStage // LoggingStage 用于记录任务开始、完成和失败日志。 LoggingStage // TimeoutStage 用于任务级超时控制,通常由 queue.WithTimeout 在注册期自动生成。 TimeoutStage // RateLimitStage 用于在占用 worker 并发和业务锁前进行限流。 RateLimitStage // ConcurrencyStage 用于按任务类型、业务 key、租户等维度限制并发执行。 ConcurrencyStage // LockStage 用于在进入业务逻辑前获取业务锁。 LockStage // RetryStage 用于判断错误是否需要重试,并附加重试延迟。 RetryStage // DeadLetterStage 用于在重试策略完成分类后记录终态失败任务。 DeadLetterStage // BusinessStage 是普通 middleware 的默认阶段,也是业务 handler 的执行边界。 BusinessStage )
type NATSConfig ¶
type NATSConfig struct {
Stream string `mapstructure:"stream" yaml:"stream"`
SubjectPrefix string `mapstructure:"subject_prefix" yaml:"subject_prefix"`
DurablePrefix string `mapstructure:"durable_prefix" yaml:"durable_prefix"`
AckWait time.Duration `mapstructure:"ack_wait" yaml:"ack_wait"`
MaxDeliver int `mapstructure:"max_deliver" yaml:"max_deliver"`
MaxAge time.Duration `mapstructure:"max_age" yaml:"max_age"`
Duplicates time.Duration `mapstructure:"duplicates" yaml:"duplicates"`
Storage string `mapstructure:"storage" yaml:"storage"`
Replicas int `mapstructure:"replicas" yaml:"replicas"`
FetchBatch int `mapstructure:"fetch_batch" yaml:"fetch_batch"`
FetchWait time.Duration `mapstructure:"fetch_wait" yaml:"fetch_wait"`
RetryDelay time.Duration `mapstructure:"retry_delay" yaml:"retry_delay"`
}
type NoopTaskLogger ¶
type NoopTaskLogger struct{}
func (NoopTaskLogger) Error ¶
func (NoopTaskLogger) Error(string, ...any)
func (NoopTaskLogger) Info ¶
func (NoopTaskLogger) Info(string, ...any)
func (NoopTaskLogger) Warn ¶
func (NoopTaskLogger) Warn(string, ...any)
type ObserverFunc ¶
type ObserverFunc struct {
Start func(context.Context, *Message)
Finish func(context.Context, *Message, error)
Retry func(context.Context, *Message, error)
Failure func(context.Context, *Message, error)
}
func (ObserverFunc) OnTaskFailure ¶
func (o ObserverFunc) OnTaskFailure(ctx context.Context, msg *Message, err error)
func (ObserverFunc) OnTaskFinish ¶
func (o ObserverFunc) OnTaskFinish(ctx context.Context, msg *Message, err error)
func (ObserverFunc) OnTaskRetry ¶
func (o ObserverFunc) OnTaskRetry(ctx context.Context, msg *Message, err error)
func (ObserverFunc) OnTaskStart ¶
func (o ObserverFunc) OnTaskStart(ctx context.Context, msg *Message)
type OperationsRuntime ¶
type OperationsRuntime struct {
// contains filtered or unexported fields
}
func NewOperationsRuntime ¶
func NewOperationsRuntime(manager Manager) *OperationsRuntime
func (*OperationsRuntime) ArchiveTask ¶
func (*OperationsRuntime) CancelTask ¶
func (*OperationsRuntime) Capabilities ¶
func (o *OperationsRuntime) Capabilities() map[Capability]bool
func (*OperationsRuntime) CleanTasks ¶
func (*OperationsRuntime) DeleteTask ¶
func (*OperationsRuntime) DrainQueue ¶
func (o *OperationsRuntime) DrainQueue(ctx context.Context, queueName string) error
func (*OperationsRuntime) ListFailedTasks ¶
func (*OperationsRuntime) ListQueues ¶
func (o *OperationsRuntime) ListQueues(ctx context.Context) ([]*QueueInfo, error)
func (*OperationsRuntime) Manager ¶
func (o *OperationsRuntime) Manager() Manager
func (*OperationsRuntime) Metadata ¶
func (o *OperationsRuntime) Metadata() RuntimeMetadata
func (*OperationsRuntime) Metrics ¶
func (o *OperationsRuntime) Metrics(ctx context.Context) (*RuntimeMetrics, error)
func (*OperationsRuntime) PauseQueue ¶
func (o *OperationsRuntime) PauseQueue(ctx context.Context, queueName string) error
func (*OperationsRuntime) QueueStatus ¶
func (o *OperationsRuntime) QueueStatus(ctx context.Context) ([]QueueRuntimeStatus, error)
func (*OperationsRuntime) ResumeQueue ¶
func (o *OperationsRuntime) ResumeQueue(ctx context.Context, queueName string) error
func (*OperationsRuntime) RuntimeStatus ¶
func (o *OperationsRuntime) RuntimeStatus(ctx context.Context) (*RuntimeStatusInfo, error)
func (*OperationsRuntime) SetManager ¶
func (o *OperationsRuntime) SetManager(manager Manager)
func (*OperationsRuntime) SetMetadata ¶
func (o *OperationsRuntime) SetMetadata(metadata RuntimeMetadata)
func (*OperationsRuntime) SetTaskStore ¶ added in v0.1.1
func (o *OperationsRuntime) SetTaskStore(store TaskStore)
func (*OperationsRuntime) Status ¶
func (o *OperationsRuntime) Status(ctx context.Context) ([]*QueueInfo, error)
func (*OperationsRuntime) Supports ¶
func (o *OperationsRuntime) Supports(cap Capability) bool
func (*OperationsRuntime) TaskStore ¶ added in v0.1.1
func (o *OperationsRuntime) TaskStore() TaskStore
func (*OperationsRuntime) WorkerStatus ¶
func (o *OperationsRuntime) WorkerStatus(ctx context.Context) WorkerRuntimeStatus
type Option ¶
type Option func(*EnqueueOptions)
func WithDeadline ¶
func WithMaxRetry ¶
func WithPriority ¶
func WithProcessAt ¶
func WithProcessIn ¶
func WithRateLimitKey ¶
func WithRetention ¶
func WithTaskID ¶
func WithTimeout ¶
func WithUnique ¶
type Orchestrator ¶
type Orchestrator struct {
// contains filtered or unexported fields
}
func NewOrchestrator ¶
func NewOrchestrator(opts ...OrchestratorOption) *Orchestrator
func (*Orchestrator) AddEventPublisher ¶
func (o *Orchestrator) AddEventPublisher(publisher EventPublisher)
func (*Orchestrator) AddObserver ¶
func (o *Orchestrator) AddObserver(observer Observer)
func (*Orchestrator) Execute ¶
func (o *Orchestrator) Execute(ctx context.Context, exec RuntimeExecution) error
type OrchestratorOption ¶
type OrchestratorOption func(*Orchestrator)
func WithEventPublisher ¶
func WithEventPublisher(publisher EventPublisher) OrchestratorOption
func WithObserver ¶
func WithObserver(observer Observer) OrchestratorOption
type OutboxConfig ¶
type OutboxFactory ¶
type OutboxFactory func(QueueRunner) (Outbox, error)
type OutboxPoller ¶
type OutboxPoller struct {
// contains filtered or unexported fields
}
func NewOutboxPoller ¶
func NewOutboxPoller(outbox Outbox, cfg OutboxPollerConfig) *OutboxPoller
func (*OutboxPoller) Start ¶
func (p *OutboxPoller) Start(ctx context.Context) context.CancelFunc
type OutboxPollerConfig ¶
type OutboxTask ¶
func NewOutboxTask ¶
func NewOutboxTask(task Task, opts ...Option) OutboxTask
type ProgressReporter ¶
type QueueAction ¶
type QueueDrainer ¶
type QueueMetrics ¶
type QueueRunner ¶
func New ¶
func New(cfg Config, opts ...RuntimeOption) QueueRunner
func NewRunner ¶
func NewRunner(cfg Config, opts ...RuntimeOption) (QueueRunner, error)
type QueueRuntimeStatus ¶
type QueueRuntimeStatus struct {
Name string
State QueueState
Priority int
Metrics QueueMetrics
PausedAt *time.Time
UpdatedAt time.Time
}
type QueueState ¶
type QueueState string
const ( QueueRunning QueueState = "running" QueuePaused QueueState = "paused" QueueDraining QueueState = "draining" QueueStopped QueueState = "stopped" QueueFailed QueueState = "failed" QueueUnknown QueueState = "unknown" )
type RateLimitConfig ¶
type RateLimitError ¶
func (*RateLimitError) Error ¶
func (e *RateLimitError) Error() string
func (*RateLimitError) Unwrap ¶
func (e *RateLimitError) Unwrap() error
type RateLimiter ¶
type RedisConfig ¶
type Registration ¶
func Register ¶
func Register(pattern string, handlers ...any) Registration
type Registry ¶
type Registry struct {
// contains filtered or unexported fields
}
func NewRegistry ¶
func NewRegistryWithDispatcher ¶
func NewRegistryWithDispatcher(worker Worker, dispatcher *Dispatcher) *Registry
func NewRegistryWithRuntime ¶
func NewRegistryWithRuntime(worker Worker, runtime *RegistryRuntime) *Registry
func (*Registry) Dispatcher ¶
func (r *Registry) Dispatcher() *Dispatcher
func (*Registry) Entries ¶
func (r *Registry) Entries() []HandlerMetadata
func (*Registry) Metadata ¶
func (r *Registry) Metadata() RegistryMetadata
func (*Registry) RegisterAll ¶
func (r *Registry) RegisterAll(registrations ...Registration) error
func (*Registry) Runtime ¶
func (r *Registry) Runtime() *RegistryRuntime
func (*Registry) Use ¶
func (r *Registry) Use(middlewares ...Middleware)
func (*Registry) UseRuntime ¶
func (r *Registry) UseRuntime(middlewares ...RuntimeMiddleware)
type RegistryMetadata ¶
type RegistryMetadata struct {
Handlers []HandlerMetadata
Middleware MiddlewareMetadata
}
type RegistryRuntime ¶
type RegistryRuntime struct {
// contains filtered or unexported fields
}
func NewRegistryRuntime ¶
func NewRegistryRuntime(dispatcher *Dispatcher) *RegistryRuntime
func (*RegistryRuntime) AddHook ¶
func (r *RegistryRuntime) AddHook(h Hook)
func (*RegistryRuntime) Dispatcher ¶
func (r *RegistryRuntime) Dispatcher() *Dispatcher
func (*RegistryRuntime) Entries ¶
func (r *RegistryRuntime) Entries() []HandlerMetadata
func (*RegistryRuntime) Handler ¶
func (r *RegistryRuntime) Handler(pattern string) HandlerFunc
func (*RegistryRuntime) Metadata ¶
func (r *RegistryRuntime) Metadata() RegistryMetadata
func (*RegistryRuntime) Register ¶
func (r *RegistryRuntime) Register(pattern string, handlers ...any) error
func (*RegistryRuntime) SetOrchestrator ¶
func (r *RegistryRuntime) SetOrchestrator(orchestrator *Orchestrator)
func (*RegistryRuntime) Use ¶
func (r *RegistryRuntime) Use(middlewares ...Middleware)
func (*RegistryRuntime) UseRuntime ¶
func (r *RegistryRuntime) UseRuntime(middlewares ...RuntimeMiddleware)
type RetryDelayFunc ¶
type RetryStrategy ¶
type RetryStrategy interface {
NextRetry(ctx context.Context, msg *Message, retryCount int, err error) (time.Duration, bool)
}
func RetryDelayStrategy ¶
func RetryDelayStrategy(fn RetryDelayFunc) RetryStrategy
type RetryStrategyFunc ¶
type RunnerDriver ¶
type RunnerDriver interface {
Driver
NewRunner(cfg Config, opts ...RuntimeOption) (QueueRunner, error)
}
type RuntimeContext ¶
RuntimeContext 只保存 runtime metadata、runtime resources 标识和 runtime state。
不要把业务 payload、业务对象或外部 SDK 客户端放入 RuntimeContext。
func RuntimeContextFromContext ¶
func RuntimeContextFromContext(ctx context.Context) (*RuntimeContext, bool)
func RuntimeContextFromMessage ¶
func RuntimeContextFromMessage(msg *Message) *RuntimeContext
type RuntimeError ¶
type RuntimeError struct {
Kind ErrorKind
Code string
Retryable bool
Fatal bool
Ignore bool
RetryIn time.Duration
Err error
}
func RuntimeErrorFrom ¶
func RuntimeErrorFrom(err error) (*RuntimeError, bool)
func (*RuntimeError) Error ¶
func (e *RuntimeError) Error() string
func (*RuntimeError) Unwrap ¶
func (e *RuntimeError) Unwrap() error
type RuntimeEvent ¶
type RuntimeEvent struct {
Type RuntimeEventType
Message *Message
Error error
Timestamp time.Time
}
type RuntimeEventType ¶
type RuntimeEventType string
const ( RuntimeEventTaskStarted RuntimeEventType = "task.started" RuntimeEventTaskSuccess RuntimeEventType = "task.success" RuntimeEventTaskFailed RuntimeEventType = "task.failed" RuntimeEventTaskRetry RuntimeEventType = "task.retry" RuntimeEventTaskDeadLetter RuntimeEventType = "task.deadletter" RuntimeEventTaskTimeout RuntimeEventType = "task.timeout" )
type RuntimeExecution ¶
type RuntimeExecution struct {
Pattern string
Handler HandlerFunc
Middlewares []RuntimeMiddleware
Hooks []Hook
Message *Message
Metadata HandlerMetadata
}
type RuntimeInstance ¶
type RuntimeInstance struct {
// contains filtered or unexported fields
}
func From ¶
func From(source any) *RuntimeInstance
From resolves a runtime instance from runtime wiring objects. Business code should prefer Runtime(ctx), package helpers, or explicit Client/Manager values.
func InitRuntimeInstance ¶
func InitRuntimeInstance(ctx context.Context, cfg Config, runtimeCfg RuntimeKernelConfig, opts ...RuntimeInstanceOption) (*RuntimeInstance, error)
func NewRuntimeInstance ¶
func NewRuntimeInstance(runner QueueRunner, opts ...RuntimeInstanceOption) *RuntimeInstance
func NewRuntimeInstanceFromParts ¶
func NewRuntimeInstanceFromParts(parts RuntimeParts, opts ...RuntimeInstanceOption) *RuntimeInstance
func Runtime ¶
func Runtime(ctx context.Context) *RuntimeInstance
func (*RuntimeInstance) AddHook ¶
func (rt *RuntimeInstance) AddHook(h Hook)
func (*RuntimeInstance) ArchiveTask ¶
func (*RuntimeInstance) BatchEnqueue ¶
func (*RuntimeInstance) CancelTask ¶
func (*RuntimeInstance) Capabilities ¶
func (rt *RuntimeInstance) Capabilities() map[Capability]bool
func (*RuntimeInstance) Client ¶
func (rt *RuntimeInstance) Client() Client
func (*RuntimeInstance) Close ¶
func (rt *RuntimeInstance) Close() error
func (*RuntimeInstance) DeleteTask ¶
func (*RuntimeInstance) DispatchAutoRetryTasks ¶
func (*RuntimeInstance) Handle ¶
func (rt *RuntimeInstance) Handle(pattern string, handler HandlerFunc)
func (*RuntimeInstance) Kernel ¶
func (rt *RuntimeInstance) Kernel() *RuntimeKernel
func (*RuntimeInstance) ListQueues ¶
func (rt *RuntimeInstance) ListQueues(ctx context.Context) ([]*QueueInfo, error)
func (*RuntimeInstance) Manager ¶
func (rt *RuntimeInstance) Manager() Manager
func (*RuntimeInstance) Metadata ¶
func (rt *RuntimeInstance) Metadata() RuntimeMetadata
func (*RuntimeInstance) NewRegistry ¶
func (rt *RuntimeInstance) NewRegistry() *Registry
func (*RuntimeInstance) Operations ¶
func (rt *RuntimeInstance) Operations() *OperationsRuntime
func (*RuntimeInstance) PauseQueue ¶
func (rt *RuntimeInstance) PauseQueue(ctx context.Context, queueName string) error
func (*RuntimeInstance) Registry ¶
func (rt *RuntimeInstance) Registry() *RegistryRuntime
func (*RuntimeInstance) RequeueTaskRecord ¶
func (rt *RuntimeInstance) RequeueTaskRecord(ctx context.Context, record TaskRecord) (*TaskInfo, error)
func (*RuntimeInstance) ResumeQueue ¶
func (rt *RuntimeInstance) ResumeQueue(ctx context.Context, queueName string) error
func (*RuntimeInstance) Runner ¶
func (rt *RuntimeInstance) Runner() QueueRunner
func (*RuntimeInstance) StartSchedulePoller ¶
func (rt *RuntimeInstance) StartSchedulePoller(ctx context.Context, cfg ScheduleConfig) context.CancelFunc
func (*RuntimeInstance) Supports ¶
func (rt *RuntimeInstance) Supports(cap Capability) bool
func (*RuntimeInstance) Use ¶
func (rt *RuntimeInstance) Use(middlewares ...Middleware)
func (*RuntimeInstance) UseRuntime ¶
func (rt *RuntimeInstance) UseRuntime(middlewares ...RuntimeMiddleware)
func (*RuntimeInstance) Worker ¶
func (rt *RuntimeInstance) Worker() Worker
type RuntimeInstanceOption ¶
type RuntimeInstanceOption func(*RuntimeInstance)
func WithRuntimeKernel ¶
func WithRuntimeKernel(kernel *RuntimeKernel) RuntimeInstanceOption
func WithRuntimeMetadata ¶
func WithRuntimeMetadata(metadata RuntimeMetadata) RuntimeInstanceOption
func WithRuntimeOperations ¶
func WithRuntimeOperations(operations *OperationsRuntime) RuntimeInstanceOption
func WithRuntimeRegistry ¶
func WithRuntimeRegistry(registry *RegistryRuntime) RuntimeInstanceOption
func WithRuntimeTaskStore ¶
func WithRuntimeTaskStore(store TaskStore) RuntimeInstanceOption
type RuntimeKernel ¶
type RuntimeKernel struct {
// contains filtered or unexported fields
}
func InitRuntimeKernel ¶
func InitRuntimeKernel(ctx context.Context, cfg Config, runtimeCfg RuntimeKernelConfig) (*RuntimeKernel, error)
func (*RuntimeKernel) Close ¶
func (k *RuntimeKernel) Close()
func (*RuntimeKernel) Orchestrator ¶
func (k *RuntimeKernel) Orchestrator() *Orchestrator
func (*RuntimeKernel) RateLimiter ¶
func (k *RuntimeKernel) RateLimiter() (RateLimiter, RateLimitConfig, bool)
func (*RuntimeKernel) Runner ¶
func (k *RuntimeKernel) Runner() QueueRunner
type RuntimeKernelConfig ¶
type RuntimeKernelConfig struct {
IsFailure IsFailureFunc
RateLimiter RateLimiter
RateLimitConfig RateLimitConfig
OutboxFactory OutboxFactory
OutboxMigrate func(context.Context) error
OutboxPoller OutboxPollerConfig
EventPublishers []EventPublisher
Observers []Observer
}
type RuntimeMetadata ¶
type RuntimeMetadata struct {
Name string
Service string
Driver string
Worker string
Queues map[string]int
DefaultQueue string
StrictPriority bool
Concurrency int
RetryCount int
MaxRetry *int
Timeout time.Duration
Delay time.Duration
Priority int
Trace TraceMetadata
WorkerMetadata WorkerMetadata
Middleware MiddlewareMetadata
RateLimit RateLimitConfig
}
func RuntimeMetadataFromConfig ¶
func RuntimeMetadataFromConfig(name string, service string, cfg Config) RuntimeMetadata
func RuntimeMetadataFromWorkerProfile ¶
func RuntimeMetadataFromWorkerProfile(name string, service string, cfg Config, profile WorkerProfile) RuntimeMetadata
type RuntimeMetrics ¶
type RuntimeMetrics struct {
Queues map[string]QueueMetrics
Total QueueMetrics
}
type RuntimeMiddleware ¶
type RuntimeMiddleware struct {
Stage MiddlewareStage
Middleware Middleware
}
func BusinessMiddleware ¶
func BusinessMiddleware(middleware Middleware) RuntimeMiddleware
func RuntimeMiddlewares ¶
func RuntimeMiddlewares(stage MiddlewareStage, middlewares ...Middleware) []RuntimeMiddleware
func StageMiddleware ¶
func StageMiddleware(stage MiddlewareStage, middleware Middleware) RuntimeMiddleware
type RuntimeOption ¶
type RuntimeOption func(*RuntimeOptions)
func WithIsFailure ¶
func WithIsFailure(fn IsFailureFunc) RuntimeOption
type RuntimeOptions ¶
type RuntimeOptions struct {
IsFailure IsFailureFunc
}
func ApplyRuntimeOptions ¶
func ApplyRuntimeOptions(opts []RuntimeOption) RuntimeOptions
func DefaultRuntimeOptions ¶
func DefaultRuntimeOptions() RuntimeOptions
type RuntimeParts ¶
type RuntimeParts struct {
Runner QueueRunner
Client Client
Worker Worker
Manager Manager
}
type RuntimeState ¶
type RuntimeState string
const ( RuntimeRunning RuntimeState = "running" RuntimePaused RuntimeState = "paused" RuntimeDraining RuntimeState = "draining" RuntimeStopped RuntimeState = "stopped" RuntimeFailed RuntimeState = "failed" )
type RuntimeStatusInfo ¶
type RuntimeStatusInfo struct {
State RuntimeState
Queues []QueueRuntimeStatus
Worker WorkerRuntimeStatus
UpdatedAt time.Time
Error string
}
type ScheduleConfig ¶
type TaskAutoRetryStore ¶
type TaskDeletionStore ¶ added in v0.1.1
type TaskInfo ¶
type TaskInfo struct {
ID string
Type string
Queue string
State TaskState
Payload []byte
Headers map[string]string
MaxRetry int
Retried int
LastError string
Timeout time.Duration
Deadline *time.Time
NextRunAt *time.Time
LastRunAt *time.Time
CreatedAt time.Time
UpdatedAt time.Time
TrackID string
RequestID string
TraceID string
SpanID string
TenantID string
UserID string
Result string
}
func BatchEnqueue ¶
func RequeueTaskRecord ¶ added in v0.1.1
func RequeueTaskRecord(ctx context.Context, record TaskRecord) (*TaskInfo, error)
type TaskLogger ¶
type TaskLogger interface {
Info(message string, fields ...any)
Warn(message string, fields ...any)
Error(message string, fields ...any)
}
func TaskLoggerFromContext ¶
func TaskLoggerFromContext(ctx context.Context) TaskLogger
type TaskPayload ¶
type TaskPayload struct {
Version int `json:"version"`
Data json.RawMessage `json:"data"`
}
type TaskRecord ¶
type TaskRecord struct {
RecordID int64
Driver string
TaskID string
Queue string
Type string
Payload []byte
State TaskState
MaxRetry int
Attempts int
TimeoutSeconds int
UniqueSeconds int
DelaySeconds int
AutoRetryEnabled bool
AutoRetryCount int
AutoRetryMax int
AutoRetryDelaySeconds int
NextRetryAt *time.Time
LastRetryError string
ScheduledAt *time.Time
StartedAt *time.Time
FinishedAt *time.Time
LastError string
WorkerID string
TrackID string
RequestID string
TraceID string
SpanID string
CreatedAt time.Time
UpdatedAt time.Time
Headers map[string]string
}
func RecordTaskScheduled ¶
func RecordTaskScheduled(ctx context.Context, store TaskStore, record TaskRecord) (TaskRecord, error)
func RecordTaskSubmitting ¶
func RecordTaskSubmitting(ctx context.Context, store TaskStore, record TaskRecord) (TaskRecord, error)
type TaskRunLogRecord ¶
type TaskRunRecord ¶
type TaskRunRecord struct {
RunDBID int64
RunID string
QueueTaskID int64
TaskID string
Queue string
Type string
Attempt int
Status TaskState
WorkerID string
StartedAt time.Time
FinishedAt time.Time
DurationMS int64
Error string
TrackID string
RequestID string
TraceID string
SpanID string
Headers map[string]string
}
type TaskScheduleStore ¶
type TaskScheduleStore interface {
RecordScheduled(ctx context.Context, record TaskRecord) (TaskRecord, error)
ClaimScheduled(ctx context.Context, now time.Time, limit int, workerID string, driver string) ([]TaskRecord, error)
}
type TaskState ¶
type TaskState string
const ( TaskPending TaskState = "pending" TaskRunning TaskState = "running" TaskRetrying TaskState = "retrying" TaskFailed TaskState = "failed" TaskDeadLetter TaskState = "deadletter" TaskSuccess TaskState = "success" StatePending TaskState = "pending" StateSubmitting TaskState = "submitting" StateSubmitFailed TaskState = "submit_failed" StateActive TaskState = "active" StateScheduled TaskState = "scheduled" StateRetry TaskState = "retry" StateSucceeded TaskState = "succeeded" StateFailed TaskState = "failed" StateArchived TaskState = "archived" StateCanceled TaskState = "canceled" StateUnknown TaskState = "unknown" )
type TaskStatusUpdate ¶
type TaskStore ¶
type TaskStore interface {
RecordEnqueued(ctx context.Context, record TaskRecord) error
EnsureRunning(ctx context.Context, record TaskRecord) (TaskRecord, error)
StartRun(ctx context.Context, run TaskRunRecord) (TaskRunRecord, error)
FinishRun(ctx context.Context, run TaskRunRecord) error
AppendRunLog(ctx context.Context, log TaskRunLogRecord) error
UpdateTaskStatus(ctx context.Context, update TaskStatusUpdate) error
}
type TaskStoreOptions ¶
type TaskSubmissionStore ¶
type TaskSubmissionStore interface {
RecordSubmitting(ctx context.Context, record TaskRecord) (TaskRecord, error)
RecordDispatched(ctx context.Context, record TaskRecord) error
RecordDispatchFailed(ctx context.Context, record TaskRecord) error
}
type TraceMetadata ¶
type Worker ¶
type Worker interface {
Handle(taskType string, handler HandlerFunc)
Use(middlewares ...Middleware)
Run(ctx context.Context) error
Shutdown(ctx context.Context) error
}
type WorkerHeartbeat ¶
type WorkerMetadata ¶
type WorkerProfile ¶
type WorkerProfile struct {
Name string `mapstructure:"name" yaml:"name"`
Concurrency int `mapstructure:"concurrency" yaml:"concurrency"`
Queues map[string]int `mapstructure:"queues" yaml:"queues"`
StrictPriority bool `mapstructure:"strict_priority" yaml:"strict_priority"`
ShutdownTimeout time.Duration `mapstructure:"shutdown_timeout" yaml:"shutdown_timeout"`
}
func (WorkerProfile) Normalize ¶
func (p WorkerProfile) Normalize(name string, cfg Config) WorkerProfile
Source Files
¶
- capability.go
- client.go
- config.go
- context.go
- context_chain.go
- dispatcher.go
- driver_registry.go
- error.go
- governance.go
- helper.go
- lifecycle.go
- message_metadata.go
- middleware_stage.go
- observability.go
- operations.go
- option.go
- orchestrator.go
- outbox.go
- outbox_poller.go
- queue.go
- registry.go
- registry_runtime.go
- retry.go
- runtime.go
- runtime_context.go
- runtime_event.go
- runtime_instance.go
- runtime_kernel.go
- runtime_metadata.go
- runtime_status.go
- state.go
- task.go
- task_store.go
- tracing.go
Click to show internal directories.
Click to hide internal directories.