Documentation
¶
Overview ¶
Package bus provides the workflow orchestration engine used by queue.
Most applications should use the top-level queue package (`queue.New(...)`, `Queue.Dispatch`, `Queue.Chain`, `Queue.Batch`) rather than importing bus directly.
This package remains available for advanced/internal orchestration plumbing, custom workflow integration, and lower-level testing.
Index ¶
- Variables
- type BatchBuilder
- type BatchJob
- type BatchRecord
- type BatchSpec
- type BatchState
- type Bus
- type ChainBuilder
- type ChainNode
- type ChainRecord
- type ChainState
- type Context
- type DispatchResult
- type Event
- type EventKind
- type FailOnError
- type Fake
- func (f *Fake) AssertBatchCount(t testing.TB, n int)
- func (f *Fake) AssertBatched(t testing.TB, predicate func(spec BatchSpec) bool)
- func (f *Fake) AssertChained(t testing.TB, expected []string)
- func (f *Fake) AssertCount(t testing.TB, n int)
- func (f *Fake) AssertDispatched(t testing.TB, jobType string)
- func (f *Fake) AssertDispatchedOn(t testing.TB, queueName, jobType string)
- func (f *Fake) AssertDispatchedTimes(t testing.TB, jobType string, n int)
- func (f *Fake) AssertNotDispatched(t testing.TB, jobType string)
- func (f *Fake) AssertNothingBatched(t testing.TB)
- func (f *Fake) AssertNothingDispatched(t testing.TB)
- func (f *Fake) Batch(jobs ...Job) BatchBuilder
- func (f *Fake) Chain(jobs ...Job) ChainBuilder
- func (f *Fake) Dispatch(_ context.Context, job Job) (DispatchResult, error)
- func (f *Fake) FindBatch(context.Context, string) (BatchState, error)
- func (f *Fake) FindChain(context.Context, string) (ChainState, error)
- func (f *Fake) Prune(context.Context, time.Time) error
- func (f *Fake) Register(string, Handler)
- func (f *Fake) Shutdown(context.Context) error
- func (f *Fake) StartWorkers(context.Context) error
- type Handler
- type Job
- type JobOptions
- type Lock
- type Locker
- type Middleware
- type MiddlewareFunc
- type Next
- type Observer
- type ObserverFunc
- type Option
- type RateLimit
- type RateLimiter
- type RetryPolicy
- type SQLStoreConfig
- type SkipWhen
- type Store
- type WithoutOverlapping
Constants ¶
This section is empty.
Variables ¶
var ( ErrSkipped = errors.New("bus job skipped by middleware") ErrRateLimited = errors.New("bus job rate limited") ErrOverlapping = errors.New("bus job overlap prevented") )
var ErrNotFound = errors.New("bus record not found")
Functions ¶
This section is empty.
Types ¶
type BatchBuilder ¶
type BatchBuilder interface {
// Name sets a display name for the batch.
// @group Batching
//
// Example: set batch name
//
// batchID, _ := b.Batch(bus.NewJob("a", nil)).Name("nightly").Dispatch(context.Background())
// _ = batchID
Name(name string) BatchBuilder
// OnQueue applies a default queue to batch jobs that do not set one.
// @group Batching
//
// Example: set batch queue
//
// batchID, _ := b.Batch(bus.NewJob("a", nil)).OnQueue("critical").Dispatch(context.Background())
// _ = batchID
OnQueue(queue string) BatchBuilder
// AllowFailures keeps the batch running when individual jobs fail.
// @group Batching
//
// Example: allow failures
//
// batchID, _ := b.Batch(bus.NewJob("a", nil)).AllowFailures().Dispatch(context.Background())
// _ = batchID
AllowFailures() BatchBuilder
// Progress registers a callback invoked as jobs complete.
// @group Batching
//
// Example: progress callback
//
// batchID, _ := b.Batch(bus.NewJob("a", nil)).
// Progress(func(context.Context, bus.BatchState) error { return nil }).
// Dispatch(context.Background())
// _ = batchID
Progress(fn func(ctx context.Context, st BatchState) error) BatchBuilder
// Then registers a callback invoked once when batch succeeds.
// @group Batching
//
// Example: then callback
//
// batchID, _ := b.Batch(bus.NewJob("a", nil)).
// Then(func(context.Context, bus.BatchState) error { return nil }).
// Dispatch(context.Background())
// _ = batchID
Then(fn func(ctx context.Context, st BatchState) error) BatchBuilder
// Catch registers a callback invoked when batch encounters a failure.
// @group Batching
//
// Example: catch callback
//
// batchID, _ := b.Batch(bus.NewJob("a", nil)).
// Catch(func(context.Context, bus.BatchState, error) error { return nil }).
// Dispatch(context.Background())
// _ = batchID
Catch(fn func(ctx context.Context, st BatchState, err error) error) BatchBuilder
// Finally registers a callback invoked once when batch reaches terminal state.
// @group Batching
//
// Example: finally callback
//
// batchID, _ := b.Batch(bus.NewJob("a", nil)).
// Finally(func(context.Context, bus.BatchState) error { return nil }).
// Dispatch(context.Background())
// _ = batchID
Finally(fn func(ctx context.Context, st BatchState) error) BatchBuilder
// Dispatch creates and starts the batch workflow.
// @group Batching
//
// Example: dispatch batch
//
// batchID, _ := b.Batch(bus.NewJob("a", nil), bus.NewJob("b", nil)).Dispatch(context.Background())
// _ = batchID
Dispatch(ctx context.Context) (string, error)
}
type BatchRecord ¶
type BatchState ¶
type Bus ¶
type Bus interface {
Register(jobType string, handler Handler)
Dispatch(ctx context.Context, job Job) (DispatchResult, error)
Chain(jobs ...Job) ChainBuilder
Batch(jobs ...Job) BatchBuilder
StartWorkers(ctx context.Context) error
Shutdown(ctx context.Context) error
FindBatch(ctx context.Context, batchID string) (BatchState, error)
FindChain(ctx context.Context, chainID string) (ChainState, error)
Prune(ctx context.Context, before time.Time) error
}
func New ¶
New creates a bus runtime using an in-memory orchestration store. @group Constructors
Example: new bus runtime
q, _ := queue.NewSync()
b, _ := bus.New(q)
b.Register("monitor:poll", func(context.Context, bus.Context) error { return nil })
_ = b.StartWorkers(context.Background())
defer b.Shutdown(context.Background())
type PollPayload struct {
URL string `json:"url"`
}
_, _ = b.Dispatch(context.Background(), bus.NewJob("monitor:poll", PollPayload{
URL: "https://goforj.dev/health",
}))
type ChainBuilder ¶
type ChainBuilder interface {
// OnQueue applies a default queue to chain jobs that do not set one.
// @group Chaining
//
// Example: set chain queue
//
// chainID, _ := b.Chain(
// bus.NewJob("a", nil),
// bus.NewJob("b", nil),
// ).OnQueue("critical").Dispatch(context.Background())
// _ = chainID
OnQueue(queue string) ChainBuilder
// Catch registers a callback invoked when chain execution fails.
// @group Chaining
//
// Example: chain catch callback
//
// chainID, _ := b.Chain(bus.NewJob("a", nil)).
// Catch(func(context.Context, bus.ChainState, error) error { return nil }).
// Dispatch(context.Background())
// _ = chainID
Catch(fn func(ctx context.Context, st ChainState, err error) error) ChainBuilder
// Finally registers a callback invoked once when chain execution finishes.
// @group Chaining
//
// Example: chain finally callback
//
// chainID, _ := b.Chain(bus.NewJob("a", nil)).
// Finally(func(context.Context, bus.ChainState) error { return nil }).
// Dispatch(context.Background())
// _ = chainID
Finally(fn func(ctx context.Context, st ChainState) error) ChainBuilder
// Dispatch creates and starts the chain workflow.
// @group Chaining
//
// Example: dispatch chain
//
// chainID, _ := b.Chain(bus.NewJob("a", nil), bus.NewJob("b", nil)).Dispatch(context.Background())
// _ = chainID
Dispatch(ctx context.Context) (string, error)
}
type ChainRecord ¶
type ChainState ¶
type Context ¶
type Context struct {
SchemaVersion int
DispatchID string
JobID string
ChainID string
BatchID string
Attempt int
JobType string
// contains filtered or unexported fields
}
func (Context) Bind ¶
Bind unmarshals the job payload into dst. @group Job
Example: bind payload
type PollPayload struct {
URL string `json:"url"`
}
var payload PollPayload
_ = jc.Bind(&payload)
func (Context) PayloadBytes ¶
PayloadBytes returns a copy of raw job payload bytes. @group Job
Example: read raw payload bytes
raw := jc.PayloadBytes() _ = raw
type DispatchResult ¶
type DispatchResult struct {
DispatchID string
}
type EventKind ¶
type EventKind string
const ( EventDispatchStarted EventKind = "dispatch_started" EventDispatchSucceeded EventKind = "dispatch_succeeded" EventDispatchFailed EventKind = "dispatch_failed" EventJobStarted EventKind = "job_started" EventJobSucceeded EventKind = "job_succeeded" EventJobFailed EventKind = "job_failed" EventChainStarted EventKind = "chain_started" EventChainAdvanced EventKind = "chain_advanced" EventChainCompleted EventKind = "chain_completed" EventChainFailed EventKind = "chain_failed" EventBatchStarted EventKind = "batch_started" EventBatchProgressed EventKind = "batch_progressed" EventBatchCompleted EventKind = "batch_completed" EventBatchFailed EventKind = "batch_failed" EventBatchCancelled EventKind = "batch_cancelled" EventCallbackStarted EventKind = "callback_started" EventCallbackSucceeded EventKind = "callback_succeeded" EventCallbackFailed EventKind = "callback_failed" )
type FailOnError ¶
type Fake ¶
type Fake struct {
// contains filtered or unexported fields
}
func NewFake ¶
func NewFake() *Fake
NewFake creates a bus fake that records dispatch, chain, and batch calls. @group Constructors
Example: new bus fake
fake := bus.NewFake()
_, _ = fake.Dispatch(context.Background(), bus.NewJob("monitor:poll", nil))
func (*Fake) AssertBatchCount ¶
AssertBatchCount fails if total recorded batch count does not match n. @group Testing
Example: assert batch count
fake := bus.NewFake()
_, _ = fake.Batch(bus.NewJob("a", nil)).Dispatch(context.Background())
fake.AssertBatchCount(nil, 1)
func (*Fake) AssertBatched ¶
AssertBatched fails unless at least one recorded batch matches predicate. @group Testing
Example: assert batch predicate
fake := bus.NewFake()
_, _ = fake.Batch(bus.NewJob("a", nil), bus.NewJob("b", nil)).Dispatch(context.Background())
fake.AssertBatched(nil, func(spec bus.BatchSpec) bool { return len(spec.JobTypes) == 2 })
func (*Fake) AssertChained ¶
AssertChained fails if no recorded chain matches expected job type order. @group Testing
Example: assert chain
fake := bus.NewFake()
_, _ = fake.Chain(bus.NewJob("a", nil), bus.NewJob("b", nil)).Dispatch(context.Background())
fake.AssertChained(nil, []string{"a", "b"})
func (*Fake) AssertCount ¶
AssertCount fails if total dispatched count does not match n. @group Testing
Example: assert dispatch count
fake := bus.NewFake()
_, _ = fake.Dispatch(context.Background(), bus.NewJob("emails:send", nil))
fake.AssertCount(nil, 1)
func (*Fake) AssertDispatched ¶
AssertDispatched fails if the given job type was never dispatched. @group Testing
Example: assert dispatched
fake := bus.NewFake()
_, _ = fake.Dispatch(context.Background(), bus.NewJob("emails:send", nil))
fake.AssertDispatched(nil, "emails:send")
func (*Fake) AssertDispatchedOn ¶
AssertDispatchedOn fails if a job type was not dispatched on queueName. @group Testing
Example: assert dispatched on queue
fake := bus.NewFake()
_, _ = fake.Dispatch(context.Background(), bus.NewJob("emails:send", nil).OnQueue("critical"))
fake.AssertDispatchedOn(nil, "critical", "emails:send")
func (*Fake) AssertDispatchedTimes ¶
AssertDispatchedTimes fails if dispatched count for job type does not match n. @group Testing
Example: assert dispatch count by type
fake := bus.NewFake()
_, _ = fake.Dispatch(context.Background(), bus.NewJob("emails:send", nil))
_, _ = fake.Dispatch(context.Background(), bus.NewJob("emails:send", nil))
fake.AssertDispatchedTimes(nil, "emails:send", 2)
func (*Fake) AssertNotDispatched ¶
AssertNotDispatched fails if the given job type was dispatched. @group Testing
Example: assert not dispatched
fake := bus.NewFake() fake.AssertNotDispatched(nil, "emails:send")
func (*Fake) AssertNothingBatched ¶
AssertNothingBatched fails if any batch was recorded. @group Testing
Example: assert no batches
fake := bus.NewFake() fake.AssertNothingBatched(nil)
func (*Fake) AssertNothingDispatched ¶
AssertNothingDispatched fails if any job was dispatched. @group Testing
Example: assert no dispatch
fake := bus.NewFake() fake.AssertNothingDispatched(nil)
func (*Fake) Batch ¶
func (f *Fake) Batch(jobs ...Job) BatchBuilder
Batch records a batch specification. @group Testing
Example: record batch
fake := bus.NewFake()
_, _ = fake.Batch(
bus.NewJob("a", nil),
bus.NewJob("b", nil),
).Dispatch(context.Background())
func (*Fake) Chain ¶
func (f *Fake) Chain(jobs ...Job) ChainBuilder
Chain records a chain specification. @group Testing
Example: record chain
fake := bus.NewFake()
_, _ = fake.Chain(
bus.NewJob("a", nil),
bus.NewJob("b", nil),
).Dispatch(context.Background())
type Job ¶
type Job struct {
Type string
Payload any
Options JobOptions
}
func NewJob ¶
NewJob creates a typed bus job payload with optional fluent options. @group Constructors
Example: new bus job
type PollPayload struct {
URL string `json:"url"`
}
job := bus.NewJob("monitor:poll", PollPayload{
URL: "https://goforj.dev/health",
}).
OnQueue("monitor-critical").
Delay(2 * time.Second).
Timeout(15 * time.Second).
Retry(3).
Backoff(500 * time.Millisecond).
UniqueFor(30 * time.Second)
_ = job
func (Job) Backoff ¶
Backoff sets retry backoff for this job. @group Job
Example: set retry backoff
job := bus.NewJob("emails:send", nil).Backoff(500 * time.Millisecond)
_ = job
func (Job) Delay ¶
Delay defers job execution. @group Job
Example: set delay
job := bus.NewJob("emails:send", nil).Delay(2 * time.Second)
_ = job
func (Job) OnQueue ¶
OnQueue sets the target queue for this job. @group Job
Example: set queue
job := bus.NewJob("emails:send", nil).OnQueue("critical")
_ = job
func (Job) Retry ¶
Retry sets max retry attempts for this job. @group Job
Example: set retry count
job := bus.NewJob("emails:send", nil).Retry(5)
_ = job
type JobOptions ¶
type Middleware ¶
Middleware can intercept bus job execution. @group Middleware
type MiddlewareFunc ¶
MiddlewareFunc adapts a function to Middleware. @group Middleware
type Observer ¶
type Observer interface {
Observe(event Event)
}
func MultiObserver ¶
MultiObserver fans out one event to multiple observers. @group Events
Example: fan out observers
observer := bus.MultiObserver(
bus.ObserverFunc(func(event bus.Event) {}),
bus.ObserverFunc(func(event bus.Event) {}),
)
observer.Observe(bus.Event{Kind: bus.EventDispatchStarted})
type ObserverFunc ¶
type ObserverFunc func(event Event)
func (ObserverFunc) Observe ¶
func (f ObserverFunc) Observe(event Event)
Observe calls the wrapped observer function. @group Events
Example: observer func
observer := bus.ObserverFunc(func(event bus.Event) {
_ = event.Kind
})
observer.Observe(bus.Event{Kind: bus.EventDispatchStarted})
type Option ¶
type Option func(*runtime)
func WithClock ¶
WithClock overrides the runtime clock used for event/state timestamps. @group Options
Example: fixed clock
fixed := time.Date(2026, time.January, 1, 0, 0, 0, 0, time.UTC)
b, _ := bus.New(q, bus.WithClock(func() time.Time { return fixed }))
_ = b
func WithMiddleware ¶
func WithMiddleware(middlewares ...Middleware) Option
WithMiddleware appends middleware to the runtime execution chain. @group Options
Example: add middleware
audit := bus.MiddlewareFunc(func(ctx context.Context, jc bus.Context, next bus.Next) error {
return next(ctx, jc)
})
skipHealth := bus.SkipWhen{
Predicate: func(_ context.Context, jc bus.Context) bool { return jc.JobType == "health:ping" },
}
fatalize := bus.FailOnError{
When: func(err error) bool { return err != nil },
}
b, _ := bus.New(q, bus.WithMiddleware(audit, skipHealth, fatalize))
_ = b
func WithObserver ¶
WithObserver installs an event observer for dispatch/job/chain/batch lifecycle hooks. @group Options
Example: attach observer
observer := bus.ObserverFunc(func(event bus.Event) {
_ = event.Kind
})
b, _ := bus.New(q, bus.WithObserver(observer))
_ = b
type RateLimit ¶
type RateLimit struct {
Key func(ctx context.Context, jc Context) string
Limiter RateLimiter
}
type RateLimiter ¶
type RetryPolicy ¶
type RetryPolicy struct{}
type SQLStoreConfig ¶
type Store ¶
type Store interface {
CreateChain(ctx context.Context, rec ChainRecord) error
AdvanceChain(ctx context.Context, chainID string, completedNode string) (next *ChainNode, done bool, err error)
FailChain(ctx context.Context, chainID string, cause error) error
GetChain(ctx context.Context, chainID string) (ChainState, error)
CreateBatch(ctx context.Context, rec BatchRecord) error
MarkBatchJobStarted(ctx context.Context, batchID, jobID string) error
MarkBatchJobSucceeded(ctx context.Context, batchID, jobID string) (BatchState, bool, error)
MarkBatchJobFailed(ctx context.Context, batchID, jobID string, cause error) (BatchState, bool, error)
CancelBatch(ctx context.Context, batchID string) error
GetBatch(ctx context.Context, batchID string) (BatchState, error)
MarkCallbackInvoked(ctx context.Context, key string) (bool, error)
Prune(ctx context.Context, before time.Time) error
}
func NewMemoryStore ¶
func NewMemoryStore() Store
NewMemoryStore creates an in-memory orchestration store implementation. @group Constructors
Example: new memory store
store := bus.NewMemoryStore() _ = store
func NewSQLStore ¶
func NewSQLStore(cfg SQLStoreConfig) (Store, error)
NewSQLStore creates a SQL-backed orchestration store. @group Constructors
Example: new sql store
store, _ := bus.NewSQLStore(bus.SQLStoreConfig{
DriverName: "sqlite",
DSN: "file:bus.db?_busy_timeout=5000",
})
_ = store
type WithoutOverlapping ¶
type WithoutOverlapping struct {
Key func(ctx context.Context, jc Context) string
TTL time.Duration
Locker Locker
}