Documentation
¶
Index ¶
- Variables
- func CancelJob(ctx context.Context, q any, jobID string) error
- func ClearQueue(ctx context.Context, q any, queueName string) error
- func DeleteJob(ctx context.Context, q any, queueName, jobID string) error
- func Pause(ctx context.Context, q any, queueName string) error
- func Ready(ctx context.Context, q any) error
- func Resume(ctx context.Context, q any, queueName string) error
- func RetryJob(ctx context.Context, q any, queueName, jobID string) error
- func SafeObserve(observer Observer, event Event)
- func SupportsNativeStats(q any) bool
- func SupportsPause(q any) bool
- func SupportsQueueAdmin(q any) bool
- func ValidateDriverJob(job Job) error
- type BatchBuilder
- type BatchState
- type ChainBuilder
- type ChainState
- type ChannelObserver
- type Config
- type DatabaseConfig
- type DispatchRecord
- type DispatchResult
- type Driver
- type DriverJobOptions
- type Event
- type EventKind
- type FailOnError
- type FakeQueue
- func (f *FakeQueue) AssertCount(t testing.TB, expected int)
- func (f *FakeQueue) AssertDispatched(t testing.TB, jobType string)
- func (f *FakeQueue) AssertDispatchedOn(t testing.TB, queueName, jobType string)
- func (f *FakeQueue) AssertDispatchedTimes(t testing.TB, jobType string, expected int)
- func (f *FakeQueue) AssertNotDispatched(t testing.TB, jobType string)
- func (f *FakeQueue) AssertNothingDispatched(t testing.TB)
- func (f *FakeQueue) BusDispatch(ctx context.Context, jobType string, payload []byte, ...) error
- func (f *FakeQueue) BusRegister(string, busruntime.Handler)
- func (f *FakeQueue) Dispatch(job any) error
- func (f *FakeQueue) DispatchCtx(ctx context.Context, job any) error
- func (f *FakeQueue) Driver() Driver
- func (f *FakeQueue) Ready(ctx context.Context) error
- func (f *FakeQueue) Records() []DispatchRecord
- func (f *FakeQueue) Register(string, Handler)
- func (f *FakeQueue) Reset()
- func (f *FakeQueue) Shutdown(context.Context) error
- func (f *FakeQueue) StartWorkers(context.Context) error
- func (f *FakeQueue) Workers(int) queueRuntime
- type Handler
- type Job
- func (t Job) Backoff(backoff time.Duration) Job
- func (t Job) Bind(dst any) error
- func (t Job) Delay(delay time.Duration) Job
- func (t Job) OnQueue(name string) Job
- func (t Job) Payload(payload any) Job
- func (t Job) PayloadBytes() []byte
- func (t Job) PayloadJSON(v any) Job
- func (t Job) Retry(maxRetry int) Job
- func (t Job) Timeout(timeout time.Duration) Job
- func (t Job) UniqueFor(ttl time.Duration) Job
- type JobSnapshot
- type JobState
- type ListJobsOptions
- type ListJobsResult
- type Lock
- type Locker
- type Logger
- type Message
- type Middleware
- type MiddlewareFunc
- type Next
- type NoopLogger
- type Observer
- type ObserverFunc
- type Option
- type Queue
- func (r *Queue) Batch(jobs ...Job) BatchBuilder
- func (r *Queue) CancelJob(ctx context.Context, jobID string) error
- func (r *Queue) Chain(jobs ...Job) ChainBuilder
- func (r *Queue) ClearQueue(ctx context.Context, queueName string) error
- func (r *Queue) DeleteJob(ctx context.Context, queueName, jobID string) error
- func (r *Queue) Dispatch(job Job) (DispatchResult, error)
- func (r *Queue) DispatchCtx(ctx context.Context, job Job) (DispatchResult, error)
- func (r *Queue) Driver() Driver
- func (r *Queue) FindBatch(ctx context.Context, batchID string) (BatchState, error)
- func (r *Queue) FindChain(ctx context.Context, chainID string) (ChainState, error)
- func (r *Queue) History(ctx context.Context, queueName string, window QueueHistoryWindow) ([]QueueHistoryPoint, error)
- func (r *Queue) ListJobs(ctx context.Context, opts ListJobsOptions) (ListJobsResult, error)
- func (r *Queue) Pause(ctx context.Context, queueName string) error
- func (r *Queue) Prune(ctx context.Context, before time.Time) error
- func (r *Queue) Ready(ctx context.Context) error
- func (r *Queue) Register(jobType string, handler func(context.Context, Message) error)
- func (r *Queue) Resume(ctx context.Context, queueName string) error
- func (r *Queue) RetryJob(ctx context.Context, queueName, jobID string) error
- func (r *Queue) Run(ctx context.Context) error
- func (r *Queue) Shutdown(ctx context.Context) error
- func (r *Queue) StartWorkers(ctx context.Context) error
- func (r *Queue) Stats(ctx context.Context) (StatsSnapshot, error)
- func (r *Queue) WithWorkers(count int) *Queue
- type QueueAdmin
- type QueueController
- type QueueCounters
- type QueueHistoryPoint
- func QueueHistory(ctx context.Context, q any, queueName string, window QueueHistoryWindow) ([]QueueHistoryPoint, error)
- func SinglePointHistory(snapshot StatsSnapshot, queueName string) []QueueHistoryPoint
- func TimelineHistoryFromSnapshot(snapshot StatsSnapshot, queueName string, window QueueHistoryWindow) []QueueHistoryPoint
- type QueueHistoryProvider
- type QueueHistoryWindow
- type QueueThroughput
- type RateLimit
- type RateLimiter
- type RetryPolicy
- type SkipWhen
- type StatsCollector
- type StatsProvider
- type StatsSnapshot
- func (s StatsSnapshot) Active(name string) int64
- func (s StatsSnapshot) Archived(name string) int64
- func (s StatsSnapshot) Failed(name string) int64
- func (s StatsSnapshot) Paused(name string) int64
- func (s StatsSnapshot) Pending(name string) int64
- func (s StatsSnapshot) Processed(name string) int64
- func (s StatsSnapshot) Queue(name string) (QueueCounters, bool)
- func (s StatsSnapshot) Queues() []string
- func (s StatsSnapshot) RetryCount(name string) int64
- func (s StatsSnapshot) Scheduled(name string) int64
- func (s StatsSnapshot) Throughput(name string) (QueueThroughput, bool)
- type ThroughputWindow
- type WithoutOverlapping
- type WorkerpoolConfig
- type WorkflowEvent
- type WorkflowEventKind
- type WorkflowObserver
- type WorkflowObserverFunc
- type WorkflowStore
Constants ¶
This section is empty.
Variables ¶
var ErrBackoffUnsupported = errors.New("backoff option is not supported by this driver")
ErrBackoffUnsupported indicates requested backoff is unsupported by a driver.
var ErrDuplicate = errors.New("duplicate job")
ErrDuplicate indicates a duplicate unique job enqueue.
var ErrPauseUnsupported = errors.New("pause/resume is not supported by this driver")
ErrPauseUnsupported indicates queue pause/resume is unsupported by a driver.
var ErrQueueAdminUnsupported = errors.New("queue admin is not supported by this driver")
ErrQueueAdminUnsupported indicates queue admin operations are unsupported by a driver. @group Admin
var ErrQueuePaused = errors.New("queue is paused")
ErrQueuePaused indicates enqueue was rejected because queue is paused.
var ErrQueuerShuttingDown = errors.New("queue is shutting down")
ErrQueuerShuttingDown indicates enqueue was rejected during shutdown.
var ErrWorkerpoolQueueNotInitialized = errors.New("workerpool queue not initialized")
ErrWorkerpoolQueueNotInitialized indicates workerpool queue is unavailable.
var ErrWorkflowNotFound = bus.ErrNotFound
ErrWorkflowNotFound indicates a workflow state record is not present. @group Queue
Functions ¶
func CancelJob ¶
CancelJob cancels a job when supported. @group Admin
Example: cancel a job via helper
q, err := redisqueue.New("127.0.0.1:6379")
if err != nil {
return
}
err = queue.CancelJob(context.Background(), q, "job-id")
_ = err
func ClearQueue ¶
ClearQueue clears queue jobs when supported. @group Admin
Example: clear queue via helper
q, err := redisqueue.New("127.0.0.1:6379")
if err != nil {
return
}
err = queue.ClearQueue(context.Background(), q, "default")
_ = err
func DeleteJob ¶
DeleteJob deletes a job when supported. @group Admin
Example: delete a job via helper
q, err := redisqueue.New("127.0.0.1:6379")
if err != nil {
return
}
err = queue.DeleteJob(context.Background(), q, "default", "job-id")
_ = err
func Pause ¶
Pause pauses queue consumption for drivers that support it. @group Observability
Example: pause queue
q, _ := queue.NewSync()
_ = queue.Pause(context.Background(), q, "default")
snapshot, _ := queue.Snapshot(context.Background(), q, nil)
fmt.Println(snapshot.Paused("default"))
// Output: 1
func Ready ¶
Ready validates backend readiness for the provided queue runtime. @group Observability
Example: ready via package helper
q, _ := queue.NewSync() fmt.Println(queue.Ready(context.Background(), q) == nil) // true
func Resume ¶
Resume resumes queue consumption for drivers that support it. @group Observability
Example: resume queue
q, _ := queue.NewSync()
_ = queue.Pause(context.Background(), q, "default")
_ = queue.Resume(context.Background(), q, "default")
snapshot, _ := queue.Snapshot(context.Background(), q, nil)
fmt.Println(snapshot.Paused("default"))
// Output: 0
func RetryJob ¶
RetryJob retries (runs now) a job when supported. @group Admin
Example: retry a job via helper
q, err := redisqueue.New("127.0.0.1:6379")
if err != nil {
return
}
err = queue.RetryJob(context.Background(), q, "default", "job-id")
_ = err
func SafeObserve ¶
SafeObserve delivers an event to an observer and recovers observer panics.
This is an advanced helper intended for driver-module implementations. @group Observability
func SupportsNativeStats ¶
SupportsNativeStats reports whether a queue runtime exposes native stats snapshots. @group Observability
Example: check native stats support
q, _ := queue.NewSync() fmt.Println(queue.SupportsNativeStats(q)) // Output: true
func SupportsPause ¶
SupportsPause reports whether a queue runtime supports Pause/Resume. @group Observability
Example: check pause support
q, _ := queue.NewSync() fmt.Println(queue.SupportsPause(q)) // Output: true
func SupportsQueueAdmin ¶
SupportsQueueAdmin reports whether queue admin operations are available. @group Admin
Example: detect admin support
q, err := redisqueue.New("127.0.0.1:6379")
if err != nil {
return
}
fmt.Println(queue.SupportsQueueAdmin(q))
// Output: true
func ValidateDriverJob ¶
ValidateDriverJob validates a job value for backend dispatch.
This is an advanced helper intended for driver-module implementations. @group Driver Integration
Types ¶
type BatchBuilder ¶
type BatchBuilder interface {
Name(name string) BatchBuilder
OnQueue(queue string) BatchBuilder
AllowFailures() BatchBuilder
Progress(fn func(ctx context.Context, st BatchState) error) BatchBuilder
Then(fn func(ctx context.Context, st BatchState) error) BatchBuilder
Catch(fn func(ctx context.Context, st BatchState, err error) error) BatchBuilder
Finally(fn func(ctx context.Context, st BatchState) error) BatchBuilder
Dispatch(ctx context.Context) (string, error)
}
BatchBuilder is the high-level batch workflow builder. @group Queue
type BatchState ¶
type BatchState = bus.BatchState
BatchState is the persisted view of a batch workflow. @group Queue
type ChainBuilder ¶
type ChainBuilder interface {
OnQueue(queue string) ChainBuilder
Catch(fn func(ctx context.Context, st ChainState, err error) error) ChainBuilder
Finally(fn func(ctx context.Context, st ChainState) error) ChainBuilder
Dispatch(ctx context.Context) (string, error)
}
ChainBuilder is the high-level chain workflow builder. @group Queue
type ChainState ¶
type ChainState = bus.ChainState
ChainState is the persisted view of a chain workflow. @group Queue
type ChannelObserver ¶
ChannelObserver forwards events to a channel. @group Observability
func (ChannelObserver) Observe ¶
func (c ChannelObserver) Observe(event Event)
Observe forwards an event to the configured channel. @group Observability
Example: channel observer
ch := make(chan queue.Event, 1)
observer := queue.ChannelObserver{Events: ch}
observer.Observe(queue.Event{Kind: queue.EventProcessStarted, Queue: "default"})
event := <-ch
_ = event
type Config ¶
Config configures queue creation for New (and advanced driver/runtime interop). @group Config
type DatabaseConfig ¶
type DatabaseConfig struct {
DB *sql.DB
DriverName string
DSN string
Workers int
PollInterval time.Duration
DefaultQueue string
AutoMigrate bool
ProcessingRecoveryGrace time.Duration
ProcessingLeaseNoTimeout time.Duration
Observer Observer
Logger Logger
}
DatabaseConfig configures the SQL-backed database q. @group Config
type DispatchRecord ¶
DispatchRecord captures one dispatch observed by FakeQueue. @group Testing
type DispatchResult ¶
type DispatchResult = bus.DispatchResult
DispatchResult describes a high-level dispatch operation. @group Queue
type Driver ¶
type Driver string
Driver identifies the queue backend. @group Driver
Example: driver values
fmt.Println(queue.DriverNull, queue.DriverSync, queue.DriverWorkerpool, queue.DriverDatabase, queue.DriverRedis, queue.DriverNATS, queue.DriverSQS, queue.DriverRabbitMQ)
const ( // DriverNull drops dispatched jobs and performs no execution. DriverNull Driver = "null" // DriverSync runs handlers inline in the caller goroutine. DriverSync Driver = "sync" // DriverWorkerpool runs handlers on an in-memory workerpool. DriverWorkerpool Driver = "workerpool" // DriverDatabase selects the SQL-backed queue backend. DriverDatabase Driver = "database" // DriverRedis selects the Redis (asynq) backend. DriverRedis Driver = "redis" // DriverNATS selects the NATS backend. DriverNATS Driver = "nats" // DriverSQS selects the AWS SQS backend. DriverSQS Driver = "sqs" // DriverRabbitMQ selects the RabbitMQ backend. DriverRabbitMQ Driver = "rabbitmq" )
type DriverJobOptions ¶
type DriverJobOptions struct {
QueueName string
Timeout *time.Duration
MaxRetry *int
Attempt int
Backoff *time.Duration
Delay time.Duration
UniqueTTL time.Duration
}
DriverJobOptions exposes parsed job enqueue metadata for driver-module implementations.
This is an advanced type intended for optional driver integrations. @group Driver Integration
func DriverOptions ¶
func DriverOptions(job Job) DriverJobOptions
DriverOptions returns parsed enqueue metadata for backend dispatch.
This is an advanced helper intended for driver-module implementations. @group Driver Integration
type Event ¶
type Event struct {
Kind EventKind
Driver Driver
Queue string
JobType string
JobKey string
Attempt int
MaxRetry int
Scheduled bool
Duration time.Duration
Err error
Time time.Time
}
Event is emitted through Observer hooks for queue/worker activity. @group Driver Integration
type EventKind ¶
type EventKind string
EventKind identifies a queue runtime event. @group Driver Integration
const ( // EventEnqueueAccepted indicates a job was accepted for enqueue. EventEnqueueAccepted EventKind = "enqueue_accepted" // EventEnqueueRejected indicates enqueue failed. EventEnqueueRejected EventKind = "enqueue_rejected" // EventEnqueueDuplicate indicates enqueue was rejected as duplicate. EventEnqueueDuplicate EventKind = "enqueue_duplicate" // EventEnqueueCanceled indicates enqueue was canceled by context. EventEnqueueCanceled EventKind = "enqueue_canceled" // EventProcessStarted indicates a handler started processing. EventProcessStarted EventKind = "process_started" // EventProcessSucceeded indicates a handler completed successfully. EventProcessSucceeded EventKind = "process_succeeded" // EventProcessFailed indicates a handler returned an error. EventProcessFailed EventKind = "process_failed" // EventProcessRetried indicates a failed attempt was requeued for retry. EventProcessRetried EventKind = "process_retried" // EventProcessArchived indicates a failed attempt reached terminal state. EventProcessArchived EventKind = "process_archived" // EventQueuePaused indicates queue consumption was paused. EventQueuePaused EventKind = "queue_paused" // EventQueueResumed indicates queue consumption was resumed. EventQueueResumed EventKind = "queue_resumed" // EventProcessRecovered indicates a stale in-flight job was requeued for recovery. EventProcessRecovered EventKind = "process_recovered" // EventRepublishFailed indicates an internal delay/retry republish attempt failed. EventRepublishFailed EventKind = "republish_failed" )
type FailOnError ¶
type FailOnError = bus.FailOnError
FailOnError converts matched errors into fatal (non-retryable) failures. @group Queue
type FakeQueue ¶
type FakeQueue struct {
// contains filtered or unexported fields
}
FakeQueue is an in-memory queue fake for tests. @group Testing
func NewFake ¶
func NewFake() *FakeQueue
NewFake creates a queue fake that records dispatches and provides assertions. @group Testing
Example: fake queue assertions
fake := queue.NewFake()
_ = fake.Dispatch(
queue.NewJob("emails:send").
Payload(map[string]any{"id": 1}).
OnQueue("critical"),
)
records := fake.Records()
fmt.Println(len(records), records[0].Queue, records[0].Job.Type)
// Output: 1 critical emails:send
func (*FakeQueue) AssertCount ¶
AssertCount fails when dispatch count is not expected. @group Testing
Example: assert dispatch count
fake := queue.NewFake()
_ = fake.Dispatch(queue.NewJob("emails:send"))
fake.AssertCount(t, 1)
func (*FakeQueue) AssertDispatched ¶
AssertDispatched fails when jobType was not dispatched. @group Testing
Example: assert job type dispatched
fake := queue.NewFake()
_ = fake.Dispatch(queue.NewJob("emails:send"))
fake.AssertDispatched(t, "emails:send")
func (*FakeQueue) AssertDispatchedOn ¶
AssertDispatchedOn fails when jobType was not dispatched on queueName. @group Testing
Example: assert job type dispatched on queue
fake := queue.NewFake()
_ = fake.Dispatch(
queue.NewJob("emails:send").
OnQueue("critical"),
)
fake.AssertDispatchedOn(t, "critical", "emails:send")
func (*FakeQueue) AssertDispatchedTimes ¶
AssertDispatchedTimes fails when jobType dispatch count does not match expected. @group Testing
Example: assert job type dispatched times
fake := queue.NewFake()
_ = fake.Dispatch(queue.NewJob("emails:send"))
_ = fake.Dispatch(queue.NewJob("emails:send"))
fake.AssertDispatchedTimes(t, "emails:send", 2)
func (*FakeQueue) AssertNotDispatched ¶
AssertNotDispatched fails when jobType was dispatched. @group Testing
Example: assert job type not dispatched
fake := queue.NewFake()
_ = fake.Dispatch(queue.NewJob("emails:send"))
fake.AssertNotDispatched(t, "emails:cancel")
func (*FakeQueue) AssertNothingDispatched ¶
AssertNothingDispatched fails when any dispatch was recorded. @group Testing
Example: assert nothing dispatched
fake := queue.NewFake() fake.AssertNothingDispatched(t)
func (*FakeQueue) BusDispatch ¶
func (f *FakeQueue) BusDispatch(ctx context.Context, jobType string, payload []byte, opts busruntime.JobOptions) error
BusDispatch satisfies the internal orchestration runtime adapter. @group Testing
func (*FakeQueue) BusRegister ¶
func (f *FakeQueue) BusRegister(string, busruntime.Handler)
BusRegister satisfies the internal orchestration runtime adapter. @group Testing
func (*FakeQueue) Dispatch ¶
Dispatch records a typed job payload in-memory using the fake default queue. @group Testing
Example: dispatch to fake queue
fake := queue.NewFake()
err := fake.Dispatch(queue.NewJob("emails:send").OnQueue("default"))
_ = err
func (*FakeQueue) DispatchCtx ¶
DispatchCtx submits a typed job payload using the provided context. @group Testing
Example: dispatch with context
fake := queue.NewFake()
ctx := context.Background()
err := fake.DispatchCtx(ctx, queue.NewJob("emails:send").OnQueue("default"))
fmt.Println(err == nil)
// Output: true
func (*FakeQueue) Driver ¶
Driver returns the active queue driver. @group Testing
Example: fake driver
fake := queue.NewFake() driver := fake.Driver() _ = driver
func (*FakeQueue) Ready ¶
Ready validates fake queue readiness. @group Testing
Example: fake ready
fake := queue.NewFake() fmt.Println(fake.Ready(context.Background()) == nil) // true
func (*FakeQueue) Records ¶
func (f *FakeQueue) Records() []DispatchRecord
Records returns a copy of all dispatch records. @group Testing
Example: read records
fake := queue.NewFake()
_ = fake.Dispatch(queue.NewJob("emails:send").OnQueue("default"))
records := fake.Records()
fmt.Println(len(records), records[0].Job.Type)
// Output: 1 emails:send
func (*FakeQueue) Register ¶
Register associates a handler with a job type. @group Testing
Example: register no-op on fake
fake := queue.NewFake()
fake.Register("emails:send", func(context.Context, queue.Job) error { return nil })
func (*FakeQueue) Reset ¶
func (f *FakeQueue) Reset()
Reset clears all recorded dispatches. @group Testing
Example: reset records
fake := queue.NewFake()
_ = fake.Dispatch(queue.NewJob("emails:send").OnQueue("default"))
fmt.Println(len(fake.Records()))
fake.Reset()
fmt.Println(len(fake.Records()))
// Output:
// 1
// 0
func (*FakeQueue) Shutdown ¶
Shutdown drains running work and releases resources. @group Testing
Example: shutdown fake queue
fake := queue.NewFake() err := fake.Shutdown(context.Background()) _ = err
func (*FakeQueue) StartWorkers ¶
StartWorkers starts worker execution. @group Testing
Example: start fake workers
fake := queue.NewFake() err := fake.StartWorkers(context.Background()) _ = err
type Handler ¶
Handler processes a job. @group Job
Example: handler
handler := func(ctx context.Context, job queue.Job) error { return nil }
_ = handler
type Job ¶
type Job struct {
Type string
// contains filtered or unexported fields
}
Job is a pure queue payload value plus enqueue metadata. @group Job
Example: job
job := queue.NewJob("emails:send").
PayloadJSON(map[string]string{"to": "user@example.com"}).
OnQueue("critical")
_ = job
func DriverWithAttempt ¶
DriverWithAttempt returns a copy of the job with the attempt number set.
This is an advanced helper intended for driver-module implementations. @group Driver Integration
func NewJob ¶
NewJob creates a job value with a required job type. @group Job
Example: new job
job := queue.NewJob("emails:send")
_ = job
func (Job) Backoff ¶
Backoff sets delay between retries. @group Job
Example: backoff
job := queue.NewJob("emails:send").Backoff(500 * time.Millisecond)
_ = job
func (Job) Bind ¶
Bind unmarshals job payload JSON into dst. @group Job
Example: bind payload
type EmailPayload struct {
ID int `json:"id"`
To string `json:"to"`
}
job := queue.NewJob("emails:send").Payload(EmailPayload{
ID: 1,
To: "user@example.com",
})
var payload EmailPayload
if err := job.Bind(&payload); err != nil {
return
}
_ = payload.To
func (Job) Delay ¶
Delay defers execution by duration. @group Job
Example: delay
job := queue.NewJob("emails:send").Delay(300 * time.Millisecond)
_ = job
func (Job) OnQueue ¶
OnQueue sets the target queue name. @group Job
Example: on queue
job := queue.NewJob("emails:send").OnQueue("critical")
_ = job
func (Job) Payload ¶
Payload sets job payload from common value types. @group Job
Example: payload bytes
jobBytes := queue.NewJob("emails:send").Payload([]byte(`{"id":1}`))
_ = jobBytes
Example: payload struct
type Meta struct {
Nested bool `json:"nested"`
}
type EmailPayload struct {
ID int `json:"id"`
To string `json:"to"`
Meta Meta `json:"meta"`
}
jobStruct := queue.NewJob("emails:send").Payload(EmailPayload{
ID: 1,
To: "user@example.com",
Meta: Meta{Nested: true},
})
_ = jobStruct
Example: payload map
jobMap := queue.NewJob("emails:send").Payload(map[string]any{
"id": 1,
"to": "user@example.com",
"meta": map[string]any{"nested": true},
})
_ = jobMap
func (Job) PayloadBytes ¶
PayloadBytes returns a copy of job payload bytes. @group Job
Example: payload bytes read
job := queue.NewJob("emails:send").Payload([]byte(`{"id":1}`))
payload := job.PayloadBytes()
_ = payload
func (Job) PayloadJSON ¶
PayloadJSON marshals payload as JSON. @group Job
Example: payload json
job := queue.NewJob("emails:send").PayloadJSON(map[string]int{"id": 1})
_ = job
func (Job) Retry ¶
Retry sets max retry attempts. @group Job
Example: retry
job := queue.NewJob("emails:send").Retry(4)
_ = job
type JobSnapshot ¶
type JobSnapshot struct {
ID string
Queue string
State JobState
Type string
Payload string
Attempt int
MaxRetry int
LastError string
NextProcessAt *time.Time
CompletedAt *time.Time
}
JobSnapshot describes an admin-facing queue job record. @group Admin
type JobState ¶
type JobState string
JobState identifies queue job state used by queue admin APIs. @group Admin
type ListJobsOptions ¶
ListJobsOptions configures queue admin list jobs calls. @group Admin
func (ListJobsOptions) Normalize ¶
func (o ListJobsOptions) Normalize() ListJobsOptions
Normalize returns a safe options payload with defaults applied. @group Admin
Example: normalize list options
opts := queue.ListJobsOptions{Queue: "", State: "", Page: 0, PageSize: 1000}
normalized := opts.Normalize()
fmt.Println(normalized.Queue, normalized.State, normalized.Page, normalized.PageSize)
// Output: default pending 1 500
type ListJobsResult ¶
type ListJobsResult struct {
Jobs []JobSnapshot
Total int64
}
ListJobsResult contains queue admin job listing output. @group Admin
func ListJobs ¶
func ListJobs(ctx context.Context, q any, opts ListJobsOptions) (ListJobsResult, error)
ListJobs lists jobs for a queue and state when supported. @group Admin
Example: list jobs via helper
q, err := redisqueue.New("127.0.0.1:6379")
if err != nil {
return
}
_, err = queue.ListJobs(context.Background(), q, queue.ListJobsOptions{
Queue: "default",
State: queue.JobStatePending,
})
_ = err
type Logger ¶
type Logger interface {
Debug(args ...interface{})
Info(args ...interface{})
Warn(args ...interface{})
Error(args ...interface{})
Fatal(args ...interface{})
}
Logger is a generic runtime logger contract that drivers may use to surface their internal worker/server lifecycle output.
type Message ¶
Message is the handler message passed to the high-level queue runtime. It exposes workflow/job metadata and payload binding helpers. @group Queue
type Middleware ¶
type Middleware = bus.Middleware
Middleware applies behavior around high-level workflow/job execution. @group Queue
type MiddlewareFunc ¶
type MiddlewareFunc = bus.MiddlewareFunc
MiddlewareFunc adapts a function to queue middleware. @group Queue
type NoopLogger ¶
type NoopLogger struct{}
NoopLogger disables driver-managed internal logs when passed through config.
func (NoopLogger) Debug ¶
func (NoopLogger) Debug(...interface{})
func (NoopLogger) Error ¶
func (NoopLogger) Error(...interface{})
func (NoopLogger) Fatal ¶
func (NoopLogger) Fatal(...interface{})
func (NoopLogger) Info ¶
func (NoopLogger) Info(...interface{})
func (NoopLogger) Warn ¶
func (NoopLogger) Warn(...interface{})
type Observer ¶
type Observer interface {
// Observe handles a queue runtime event.
// @group Observability
//
// Example: observe runtime event
//
// var observer queue.Observer
// observer.Observe(queue.Event{
// Kind: queue.EventEnqueueAccepted,
// Driver: queue.DriverSync,
// Queue: "default",
// })
Observe(event Event)
}
Observer receives queue runtime events. @group Observability
func MultiObserver ¶
MultiObserver fans out events to multiple observers. @group Observability
Example: fan out to multiple observers
events := make(chan queue.Event, 2)
observer := queue.MultiObserver(
queue.ChannelObserver{Events: events},
queue.ObserverFunc(func(queue.Event) {}),
)
observer.Observe(queue.Event{Kind: queue.EventEnqueueAccepted})
fmt.Println(len(events))
// Output: 1
type ObserverFunc ¶
type ObserverFunc func(event Event)
ObserverFunc adapts a function to an Observer. @group Observability
func (ObserverFunc) Observe ¶
func (f ObserverFunc) Observe(event Event)
Observe calls the wrapped function. @group Observability
Example: observer func logging hook
logger := slog.New(slog.NewTextHandler(os.Stdout, nil))
observer := queue.ObserverFunc(func(event queue.Event) {
logger.Info("queue event",
"kind", event.Kind,
"driver", event.Driver,
"queue", event.Queue,
"job_type", event.JobType,
"attempt", event.Attempt,
"max_retry", event.MaxRetry,
"duration", event.Duration,
"err", event.Err,
)
})
observer.Observe(queue.Event{
Kind: queue.EventProcessSucceeded,
Driver: queue.DriverSync,
Queue: "default",
JobType: "emails:send",
})
type Option ¶
type Option func(*runtimeOptions)
Option configures the high-level workflow runtime. @group Queue
func WithClock ¶
WithClock overrides the workflow runtime clock. @group Queue
Example: workflow clock
q, err := queue.New(
queue.Config{Driver: queue.DriverSync},
queue.WithClock(func() time.Time { return time.Unix(0, 0) }),
)
if err != nil {
return
}
_ = q
func WithMiddleware ¶
func WithMiddleware(middlewares ...Middleware) Option
WithMiddleware appends queue workflow middleware. @group Queue
Example: middleware
mw := queue.MiddlewareFunc(func(ctx context.Context, m queue.Message, next queue.Next) error {
return next(ctx, m)
})
q, err := queue.New(queue.Config{Driver: queue.DriverSync}, queue.WithMiddleware(mw))
if err != nil {
return
}
_ = q
func WithObserver ¶
func WithObserver(observer WorkflowObserver) Option
WithObserver installs a workflow lifecycle observer. @group Queue
Example: workflow observer
observer := queue.WorkflowObserverFunc(func(event queue.WorkflowEvent) {
_ = event.Kind
})
q, err := queue.New(queue.Config{Driver: queue.DriverSync}, queue.WithObserver(observer))
if err != nil {
return
}
_ = q
func WithStore ¶
func WithStore(store WorkflowStore) Option
WithStore overrides the workflow orchestration store. @group Queue
Example: workflow store
var store queue.WorkflowStore
q, err := queue.New(queue.Config{Driver: queue.DriverSync}, queue.WithStore(store))
if err != nil {
return
}
_ = q
func WithWorkers ¶
WithWorkers sets desired worker concurrency before StartWorkers. It applies to high-level queue constructors (for example NewWorkerpool/New/NewSync). @group Queue
Example: constructor workers option
q, err := queue.NewWorkerpool(
queue.WithWorkers(4), // optional; default: runtime.NumCPU() (min 1)
)
if err != nil {
return
}
_ = q
type Queue ¶
type Queue struct {
// contains filtered or unexported fields
}
Queue is the high-level user-facing queue API. It composes the queue runtime with the internal orchestration engine. @group Queue
func New ¶
New creates the high-level Queue API based on Config.Driver. @group Constructors
Example: create a queue and dispatch a workflow-capable job
q, err := queue.New(queue.Config{Driver: queue.DriverWorkerpool})
if err != nil {
return
}
type EmailPayload struct {
ID int `json:"id"`
}
q.Register("emails:send", func(ctx context.Context, m queue.Message) error {
var payload EmailPayload
if err := m.Bind(&payload); err != nil {
return err
}
_ = payload
return nil
})
_ = q.WithWorkers(1).StartWorkers(context.Background()) // optional; default: runtime.NumCPU() (min 1)
defer q.Shutdown(context.Background())
_, _ = q.Dispatch(
queue.NewJob("emails:send").
Payload(EmailPayload{ID: 1}).
OnQueue("default"),
)
func NewNull ¶
NewNull creates a Queue on the null backend. @group Constructors
Example: null backend
q, err := queue.NewNull()
if err != nil {
return
}
_ = q
func NewSync ¶
NewSync creates a Queue on the synchronous in-process backend. @group Constructors
Example: sync backend
q, err := queue.NewSync()
if err != nil {
return
}
_ = q
func NewWorkerpool ¶
NewWorkerpool creates a Queue on the in-process workerpool backend. @group Constructors
Example: workerpool backend
q, err := queue.NewWorkerpool()
if err != nil {
return
}
_ = q
func (*Queue) Batch ¶
func (r *Queue) Batch(jobs ...Job) BatchBuilder
Batch creates a batch builder for fan-out workflow execution. @group Queue
Example: batch
q, err := queue.NewSync()
if err != nil {
return
}
q.Register("emails:send", func(ctx context.Context, m queue.Message) error { return nil })
_, _ = q.Batch(
queue.NewJob("emails:send").Payload(map[string]any{"id": 1}),
queue.NewJob("emails:send").Payload(map[string]any{"id": 2}),
).Name("send-emails").OnQueue("default").Dispatch(context.Background())
func (*Queue) CancelJob ¶
CancelJob cancels a job via queue admin capability when supported. @group Admin
Example: queue method cancel job
q, err := redisqueue.New("127.0.0.1:6379")
if err != nil {
return
}
if !queue.SupportsQueueAdmin(q) {
return
}
err = q.CancelJob(context.Background(), "job-id")
_ = err
func (*Queue) Chain ¶
func (r *Queue) Chain(jobs ...Job) ChainBuilder
Chain creates a chain builder for sequential workflow execution. @group Queue
Example: chain
q, err := queue.NewSync()
if err != nil {
return
}
q.Register("first", func(ctx context.Context, m queue.Message) error { return nil })
q.Register("second", func(ctx context.Context, m queue.Message) error { return nil })
_, _ = q.Chain(
queue.NewJob("first"),
queue.NewJob("second"),
).OnQueue("default").Dispatch(context.Background())
func (*Queue) ClearQueue ¶
ClearQueue clears queue jobs via queue admin capability when supported. @group Admin
Example: queue method clear queue
q, err := redisqueue.New("127.0.0.1:6379")
if err != nil {
return
}
if !queue.SupportsQueueAdmin(q) {
return
}
err = q.ClearQueue(context.Background(), "default")
_ = err
func (*Queue) DeleteJob ¶
DeleteJob deletes a job via queue admin capability when supported. @group Admin
Example: queue method delete job
q, err := redisqueue.New("127.0.0.1:6379")
if err != nil {
return
}
if !queue.SupportsQueueAdmin(q) {
return
}
err = q.DeleteJob(context.Background(), "default", "job-id")
_ = err
func (*Queue) Dispatch ¶
func (r *Queue) Dispatch(job Job) (DispatchResult, error)
Dispatch enqueues a high-level job using context.Background. @group Queue
Example: dispatch
q, err := queue.NewSync()
if err != nil {
return
}
q.Register("emails:send", func(ctx context.Context, m queue.Message) error { return nil })
job := queue.NewJob("emails:send").Payload(map[string]any{"id": 1}).OnQueue("default")
_, _ = q.Dispatch(job)
func (*Queue) DispatchCtx ¶
DispatchCtx enqueues a high-level job using the provided context. @group Queue
func (*Queue) Driver ¶
Driver reports the configured backend driver for the underlying queue runtime. @group Queue
Example: driver
q, err := queue.NewSync()
if err != nil {
return
}
fmt.Println(q.Driver())
// Output: sync
func (*Queue) FindBatch ¶
FindBatch returns current batch state by ID. @group Queue
Example: find batch
q, err := queue.NewSync()
if err != nil {
return
}
q.Register("emails:send", func(ctx context.Context, m queue.Message) error { return nil })
batchID, err := q.Batch(queue.NewJob("emails:send")).Dispatch(context.Background())
if err != nil {
return
}
_, _ = q.FindBatch(context.Background(), batchID)
func (*Queue) FindChain ¶
FindChain returns current chain state by ID. @group Queue
Example: find chain
q, err := queue.NewSync()
if err != nil {
return
}
q.Register("first", func(ctx context.Context, m queue.Message) error { return nil })
chainID, err := q.Chain(queue.NewJob("first")).Dispatch(context.Background())
if err != nil {
return
}
_, _ = q.FindChain(context.Background(), chainID)
func (*Queue) History ¶
func (r *Queue) History(ctx context.Context, queueName string, window QueueHistoryWindow) ([]QueueHistoryPoint, error)
History returns queue history points via queue admin capability when supported. @group Admin
Example: queue method history
q, err := redisqueue.New("127.0.0.1:6379")
if err != nil {
return
}
points, err := q.History(context.Background(), "default", queue.QueueHistoryHour)
_ = points
_ = err
func (*Queue) ListJobs ¶
func (r *Queue) ListJobs(ctx context.Context, opts ListJobsOptions) (ListJobsResult, error)
ListJobs lists jobs via queue admin capability when supported. @group Admin
Example: queue method list jobs
q, err := redisqueue.New("127.0.0.1:6379")
if err != nil {
return
}
_, err = q.ListJobs(context.Background(), queue.ListJobsOptions{
Queue: "default",
State: queue.JobStatePending,
})
_ = err
func (*Queue) Pause ¶
Pause pauses consumption for a queue when supported by the underlying driver. See the README "Queue Backends" table for Pause/Resume support and docs/backend-guarantees.md (Capability Matrix) for broader backend differences. @group Queue
Example: pause queue
q, err := queue.NewSync()
if err != nil {
return
}
if queue.SupportsPause(q) {
_ = q.Pause(context.Background(), "default")
}
func (*Queue) Prune ¶
Prune deletes old workflow state records. @group Queue
Example: prune workflow state
q, err := queue.NewSync()
if err != nil {
return
}
_ = q.Prune(context.Background(), time.Now().Add(-24*time.Hour))
func (*Queue) Ready ¶
Ready validates queue backend readiness for dispatch/worker operation. @group Queue
Example: queue ready
q, err := queue.NewSync()
if err != nil {
return
}
fmt.Println(q.Ready(context.Background()) == nil)
// true
func (*Queue) Register ¶
Register binds a handler for a high-level job type. @group Queue
Example: register
q, err := queue.NewSync()
if err != nil {
return
}
type EmailPayload struct {
ID int `json:"id"`
}
q.Register("emails:send", func(ctx context.Context, m queue.Message) error {
var payload EmailPayload
if err := m.Bind(&payload); err != nil {
return err
}
_ = payload
return nil
})
func (*Queue) Resume ¶
Resume resumes consumption for a queue when supported by the underlying driver. @group Queue
Example: resume queue
q, err := queue.NewSync()
if err != nil {
return
}
if queue.SupportsPause(q) {
_ = q.Resume(context.Background(), "default")
}
func (*Queue) RetryJob ¶
RetryJob retries (runs now) a job via queue admin capability when supported. @group Admin
Example: queue method retry job
q, err := redisqueue.New("127.0.0.1:6379")
if err != nil {
return
}
if !queue.SupportsQueueAdmin(q) {
return
}
err = q.RetryJob(context.Background(), "default", "job-id")
_ = err
func (*Queue) Run ¶
Run starts worker processing, blocks until ctx is canceled, then gracefully shuts down. @group Queue
Example: run until canceled
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
q, err := queue.NewWorkerpool()
if err != nil {
return
}
q.Register("emails:send", func(ctx context.Context, m queue.Message) error { return nil })
go func() {
time.Sleep(100 * time.Millisecond)
cancel()
}()
_ = q.Run(ctx)
func (*Queue) Shutdown ¶
Shutdown drains workers and closes underlying resources. @group Queue
Example: shutdown
q, err := queue.NewWorkerpool()
if err != nil {
return
}
_ = q.StartWorkers(context.Background())
_ = q.Shutdown(context.Background())
func (*Queue) StartWorkers ¶
StartWorkers starts worker processing. @group Queue
Example: start workers
q, err := queue.NewWorkerpool()
if err != nil {
return
}
_ = q.StartWorkers(context.Background())
func (*Queue) Stats ¶
func (r *Queue) Stats(ctx context.Context) (StatsSnapshot, error)
Stats returns a normalized snapshot when supported by the underlying driver. @group Queue
Example: stats
q, err := queue.NewSync()
if err != nil {
return
}
if queue.SupportsNativeStats(q) {
_, _ = q.Stats(context.Background())
}
func (*Queue) WithWorkers ¶
WithWorkers sets desired worker concurrency before StartWorkers. @group Queue
Example: workers
q, err := queue.NewWorkerpool()
if err != nil {
return
}
q.WithWorkers(4) // optional; default: runtime.NumCPU() (min 1)
type QueueAdmin ¶
type QueueAdmin interface {
ListJobs(ctx context.Context, opts ListJobsOptions) (ListJobsResult, error)
RetryJob(ctx context.Context, queueName, jobID string) error
CancelJob(ctx context.Context, jobID string) error
DeleteJob(ctx context.Context, queueName, jobID string) error
ClearQueue(ctx context.Context, queueName string) error
History(ctx context.Context, queueName string, window QueueHistoryWindow) ([]QueueHistoryPoint, error)
}
QueueAdmin exposes optional queue admin capabilities. @group Admin
type QueueController ¶
type QueueController interface {
Pause(ctx context.Context, queueName string) error
Resume(ctx context.Context, queueName string) error
}
QueueController exposes queue pause/resume controls. @group Observability
type QueueCounters ¶
type QueueCounters struct {
Pending int64
Active int64
Scheduled int64
Retry int64
Archived int64
Processed int64
Failed int64
Paused int64
AvgWait time.Duration
AvgRun time.Duration
}
QueueCounters exposes normalized queue counters collected from events. @group Observability
type QueueHistoryPoint ¶
QueueHistoryPoint represents processed/failed totals at a point in time. @group Admin
func QueueHistory ¶
func QueueHistory(ctx context.Context, q any, queueName string, window QueueHistoryWindow) ([]QueueHistoryPoint, error)
QueueHistory returns queue history points when supported. @group Admin
Example: history via helper
q, err := redisqueue.New("127.0.0.1:6379")
if err != nil {
return
}
_, err = queue.QueueHistory(context.Background(), q, "default", queue.QueueHistoryHour)
_ = err
func SinglePointHistory ¶
func SinglePointHistory(snapshot StatsSnapshot, queueName string) []QueueHistoryPoint
SinglePointHistory converts a snapshot into a single current-history point. This helper is intended for driver modules that do not expose historical buckets. @group Admin
Example: single-point history
snapshot := queue.StatsSnapshot{
ByQueue: map[string]queue.QueueCounters{
"default": {Processed: 12, Failed: 1},
},
}
points := queue.SinglePointHistory(snapshot, "default")
fmt.Println(len(points), points[0].Processed, points[0].Failed)
// Output: 1 12 1
func TimelineHistoryFromSnapshot ¶
func TimelineHistoryFromSnapshot(snapshot StatsSnapshot, queueName string, window QueueHistoryWindow) []QueueHistoryPoint
TimelineHistoryFromSnapshot records queue counters and returns windowed points. This is intended for drivers that don't expose native multi-point history. @group Admin
Example: timeline history from snapshots
snapshot := queue.StatsSnapshot{
ByQueue: map[string]queue.QueueCounters{
"default": {Processed: 5, Failed: 1},
},
}
points := queue.TimelineHistoryFromSnapshot(snapshot, "default", queue.QueueHistoryHour)
fmt.Println(len(points) >= 1)
// Output: true
type QueueHistoryProvider ¶
type QueueHistoryProvider interface {
History(ctx context.Context, queueName string, window QueueHistoryWindow) ([]QueueHistoryPoint, error)
}
QueueHistoryProvider exposes queue history capability independently from full queue admin support. Drivers like sync/workerpool can provide history without supporting per-job admin operations. @group Admin
type QueueHistoryWindow ¶
type QueueHistoryWindow string
QueueHistoryWindow identifies queue history horizon. @group Admin
const ( QueueHistoryHour QueueHistoryWindow = "hour" QueueHistoryDay QueueHistoryWindow = "day" QueueHistoryWeek QueueHistoryWindow = "week" )
type QueueThroughput ¶
type QueueThroughput struct {
Hour ThroughputWindow
Day ThroughputWindow
Week ThroughputWindow
}
QueueThroughput contains rolling throughput windows for a queue. @group Observability
type RateLimiter ¶
type RateLimiter = bus.RateLimiter
RateLimiter is used by RateLimit middleware. @group Queue
type RetryPolicy ¶
type RetryPolicy = bus.RetryPolicy
RetryPolicy is a pass-through middleware policy helper. @group Queue
type StatsCollector ¶
type StatsCollector struct {
// contains filtered or unexported fields
}
StatsCollector aggregates normalized counters from Observer events. @group Observability
func NewStatsCollector ¶
func NewStatsCollector() *StatsCollector
NewStatsCollector creates an event collector for queue counters. @group Constructors
Example: new stats collector
collector := queue.NewStatsCollector() _ = collector
func (*StatsCollector) Observe ¶
func (c *StatsCollector) Observe(event Event)
Observe records an event and updates normalized counters. @group Observability
Example: observe event
collector := queue.NewStatsCollector()
collector.Observe(queue.Event{
Kind: queue.EventEnqueueAccepted,
Driver: queue.DriverSync,
Queue: "default",
Time: time.Now(),
})
func (*StatsCollector) Snapshot ¶
func (c *StatsCollector) Snapshot() StatsSnapshot
Snapshot returns a copy of collected counters. @group Observability
Example: snapshot print
collector := queue.NewStatsCollector()
collector.Observe(queue.Event{
Kind: queue.EventEnqueueAccepted,
Driver: queue.DriverSync,
Queue: "default",
Time: time.Now(),
})
collector.Observe(queue.Event{
Kind: queue.EventProcessStarted,
Driver: queue.DriverSync,
Queue: "default",
JobKey: "job-1",
Time: time.Now(),
})
collector.Observe(queue.Event{
Kind: queue.EventProcessSucceeded,
Driver: queue.DriverSync,
Queue: "default",
JobKey: "job-1",
Duration: 12 * time.Millisecond,
Time: time.Now(),
})
snapshot := collector.Snapshot()
counters, _ := snapshot.Queue("default")
throughput, _ := snapshot.Throughput("default")
fmt.Printf("queues=%v\n", snapshot.Queues())
fmt.Printf("counters=%+v\n", counters)
fmt.Printf("hour=%+v\n", throughput.Hour)
// Output:
// queues=[default]
// counters={Pending:0 Active:0 Scheduled:0 Retry:0 Archived:0 Processed:1 Failed:0 Paused:0 AvgWait:0s AvgRun:12ms}
// hour={Processed:1 Failed:0}
type StatsProvider ¶
type StatsProvider interface {
Stats(ctx context.Context) (StatsSnapshot, error)
}
StatsProvider exposes driver-native queue snapshots. @group Observability
type StatsSnapshot ¶
type StatsSnapshot struct {
ByQueue map[string]QueueCounters
ThroughputByQueue map[string]QueueThroughput
}
StatsSnapshot is a point-in-time view of counters by queue. @group Observability
func Snapshot ¶
func Snapshot(ctx context.Context, q any, collector *StatsCollector) (StatsSnapshot, error)
Snapshot returns driver-native stats, falling back to collector data. @group Observability
Example: snapshot from queue runtime
q, _ := queue.NewSync()
snapshot, _ := q.Stats(context.Background())
_, ok := snapshot.Queue("default")
fmt.Println(ok)
// Output: true
func (StatsSnapshot) Active ¶
func (s StatsSnapshot) Active(name string) int64
Active returns active count for a queue. @group Observability
Example: active count getter
snapshot := queue.StatsSnapshot{
ByQueue: map[string]queue.QueueCounters{
"default": {Active: 2},
},
}
fmt.Println(snapshot.Active("default"))
// Output: 2
func (StatsSnapshot) Archived ¶
func (s StatsSnapshot) Archived(name string) int64
Archived returns archived count for a queue. @group Observability
Example: archived count getter
snapshot := queue.StatsSnapshot{
ByQueue: map[string]queue.QueueCounters{
"default": {Archived: 7},
},
}
fmt.Println(snapshot.Archived("default"))
// Output: 7
func (StatsSnapshot) Failed ¶
func (s StatsSnapshot) Failed(name string) int64
Failed returns failed count for a queue. @group Observability
Example: failed count getter
snapshot := queue.StatsSnapshot{
ByQueue: map[string]queue.QueueCounters{
"default": {Failed: 2},
},
}
fmt.Println(snapshot.Failed("default"))
// Output: 2
func (StatsSnapshot) Paused ¶
func (s StatsSnapshot) Paused(name string) int64
Paused returns paused count for a queue. @group Observability
Example: paused count getter
collector := queue.NewStatsCollector()
collector.Observe(queue.Event{
Kind: queue.EventQueuePaused,
Driver: queue.DriverSync,
Queue: "default",
Time: time.Now(),
})
snapshot := collector.Snapshot()
fmt.Println(snapshot.Paused("default"))
// Output: 1
func (StatsSnapshot) Pending ¶
func (s StatsSnapshot) Pending(name string) int64
Pending returns pending count for a queue. @group Observability
Example: pending count getter
snapshot := queue.StatsSnapshot{
ByQueue: map[string]queue.QueueCounters{
"default": {Pending: 3},
},
}
fmt.Println(snapshot.Pending("default"))
// Output: 3
func (StatsSnapshot) Processed ¶
func (s StatsSnapshot) Processed(name string) int64
Processed returns processed count for a queue. @group Observability
Example: processed count getter
snapshot := queue.StatsSnapshot{
ByQueue: map[string]queue.QueueCounters{
"default": {Processed: 11},
},
}
fmt.Println(snapshot.Processed("default"))
// Output: 11
func (StatsSnapshot) Queue ¶
func (s StatsSnapshot) Queue(name string) (QueueCounters, bool)
Queue returns queue counters for a queue name. @group Observability
Example: queue counters getter
collector := queue.NewStatsCollector()
collector.Observe(queue.Event{
Kind: queue.EventEnqueueAccepted,
Driver: queue.DriverSync,
Queue: "default",
Time: time.Now(),
})
snapshot := collector.Snapshot()
counters, ok := snapshot.Queue("default")
fmt.Println(ok, counters.Pending)
// Output: true 1
func (StatsSnapshot) Queues ¶
func (s StatsSnapshot) Queues() []string
Queues returns sorted queue names present in the snapshot. @group Observability
Example: list queues
collector := queue.NewStatsCollector()
collector.Observe(queue.Event{
Kind: queue.EventEnqueueAccepted,
Driver: queue.DriverSync,
Queue: "critical",
Time: time.Now(),
})
snapshot := collector.Snapshot()
names := snapshot.Queues()
fmt.Println(len(names), names[0])
// Output: 1 critical
func (StatsSnapshot) RetryCount ¶
func (s StatsSnapshot) RetryCount(name string) int64
RetryCount returns retry count for a queue. @group Observability
Example: retry count getter
snapshot := queue.StatsSnapshot{
ByQueue: map[string]queue.QueueCounters{
"default": {Retry: 1},
},
}
fmt.Println(snapshot.RetryCount("default"))
// Output: 1
func (StatsSnapshot) Scheduled ¶
func (s StatsSnapshot) Scheduled(name string) int64
Scheduled returns scheduled count for a queue. @group Observability
Example: scheduled count getter
snapshot := queue.StatsSnapshot{
ByQueue: map[string]queue.QueueCounters{
"default": {Scheduled: 4},
},
}
fmt.Println(snapshot.Scheduled("default"))
// Output: 4
func (StatsSnapshot) Throughput ¶
func (s StatsSnapshot) Throughput(name string) (QueueThroughput, bool)
Throughput returns rolling throughput windows for a queue name. @group Observability
Example: throughput getter
collector := queue.NewStatsCollector()
collector.Observe(queue.Event{
Kind: queue.EventProcessSucceeded,
Driver: queue.DriverSync,
Queue: "default",
Time: time.Now(),
})
snapshot := collector.Snapshot()
throughput, ok := snapshot.Throughput("default")
fmt.Printf("ok=%v hour=%+v day=%+v week=%+v\n", ok, throughput.Hour, throughput.Day, throughput.Week)
// Output: ok=true hour={Processed:1 Failed:0} day={Processed:1 Failed:0} week={Processed:1 Failed:0}
type ThroughputWindow ¶
ThroughputWindow captures processed vs failed counts in a fixed window. @group Observability
type WithoutOverlapping ¶
type WithoutOverlapping = bus.WithoutOverlapping
WithoutOverlapping prevents concurrent execution for the same key. @group Queue
type WorkerpoolConfig ¶
WorkerpoolConfig configures the in-memory workerpool q. @group Config
type WorkflowEvent ¶
WorkflowEvent is emitted by the high-level workflow runtime observer hooks. @group Queue
type WorkflowEventKind ¶
WorkflowEventKind identifies high-level workflow runtime lifecycle events. @group Queue
type WorkflowObserver ¶
WorkflowObserver receives high-level workflow runtime events. @group Queue
type WorkflowObserverFunc ¶
type WorkflowObserverFunc = bus.ObserverFunc
WorkflowObserverFunc adapts a function to a workflow observer. @group Queue
type WorkflowStore ¶
WorkflowStore is the orchestration state store used for chains/batches/callbacks. @group Queue
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
Package bus provides the workflow orchestration engine used by queue.
|
Package bus provides the workflow orchestration engine used by queue. |
|
driver
|
|
|
mysqlqueue
module
|
|
|
natsqueue
module
|
|
|
postgresqueue
module
|
|
|
rabbitmqqueue
module
|
|
|
redisqueue
module
|
|
|
sqlitequeue
module
|
|
|
sqlqueuecore
module
|
|
|
sqsqueue
module
|
|
|
internal
|
|
|
readmecheck
Package readmecheck contains CI-only compile checks for curated manual README snippets.
|
Package readmecheck contains CI-only compile checks for curated manual README snippets. |
|
runtimehook
Package runtimehook provides a tiny internal hook registry used to avoid import cycles between the public queue package and internal bridge/test helpers introduced during API flattening.
|
Package runtimehook provides a tiny internal hook registry used to avoid import cycles between the public queue package and internal bridge/test helpers introduced during API flattening. |
|
Package queueconfig contains shared configuration structs used by optional driver modules.
|
Package queueconfig contains shared configuration structs used by optional driver modules. |
|
Package queuecore contains shared helper logic used by optional driver modules.
|
Package queuecore contains shared helper logic used by optional driver modules. |
|
Package queuefake provides a queue-first test harness for queue and workflow assertions.
|
Package queuefake provides a queue-first test harness for queue and workflow assertions. |