bus

package
v0.1.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 16, 2026 License: MIT Imports: 12 Imported by: 0

Documentation

Overview

Package bus provides the workflow orchestration engine used by queue.

Most applications should use the top-level queue package (`queue.New(...)`, `Queue.Dispatch`, `Queue.Chain`, `Queue.Batch`) rather than importing bus directly.

This package remains available for advanced/internal orchestration plumbing, custom workflow integration, and lower-level testing.

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrSkipped     = errors.New("bus job skipped by middleware")
	ErrRateLimited = errors.New("bus job rate limited")
	ErrOverlapping = errors.New("bus job overlap prevented")
)
View Source
var ErrNotFound = errors.New("bus record not found")

Functions

This section is empty.

Types

type BatchBuilder

type BatchBuilder interface {
	// Name sets a display name for the batch.
	// @group Batching
	//
	// Example: set batch name
	//
	//	batchID, _ := b.Batch(bus.NewJob("a", nil)).Name("nightly").Dispatch(context.Background())
	//	_ = batchID
	Name(name string) BatchBuilder
	// OnQueue applies a default queue to batch jobs that do not set one.
	// @group Batching
	//
	// Example: set batch queue
	//
	//	batchID, _ := b.Batch(bus.NewJob("a", nil)).OnQueue("critical").Dispatch(context.Background())
	//	_ = batchID
	OnQueue(queue string) BatchBuilder
	// AllowFailures keeps the batch running when individual jobs fail.
	// @group Batching
	//
	// Example: allow failures
	//
	//	batchID, _ := b.Batch(bus.NewJob("a", nil)).AllowFailures().Dispatch(context.Background())
	//	_ = batchID
	AllowFailures() BatchBuilder
	// Progress registers a callback invoked as jobs complete.
	// @group Batching
	//
	// Example: progress callback
	//
	//	batchID, _ := b.Batch(bus.NewJob("a", nil)).
	//		Progress(func(context.Context, bus.BatchState) error { return nil }).
	//		Dispatch(context.Background())
	//	_ = batchID
	Progress(fn func(ctx context.Context, st BatchState) error) BatchBuilder
	// Then registers a callback invoked once when batch succeeds.
	// @group Batching
	//
	// Example: then callback
	//
	//	batchID, _ := b.Batch(bus.NewJob("a", nil)).
	//		Then(func(context.Context, bus.BatchState) error { return nil }).
	//		Dispatch(context.Background())
	//	_ = batchID
	Then(fn func(ctx context.Context, st BatchState) error) BatchBuilder
	// Catch registers a callback invoked when batch encounters a failure.
	// @group Batching
	//
	// Example: catch callback
	//
	//	batchID, _ := b.Batch(bus.NewJob("a", nil)).
	//		Catch(func(context.Context, bus.BatchState, error) error { return nil }).
	//		Dispatch(context.Background())
	//	_ = batchID
	Catch(fn func(ctx context.Context, st BatchState, err error) error) BatchBuilder
	// Finally registers a callback invoked once when batch reaches terminal state.
	// @group Batching
	//
	// Example: finally callback
	//
	//	batchID, _ := b.Batch(bus.NewJob("a", nil)).
	//		Finally(func(context.Context, bus.BatchState) error { return nil }).
	//		Dispatch(context.Background())
	//	_ = batchID
	Finally(fn func(ctx context.Context, st BatchState) error) BatchBuilder
	// Dispatch creates and starts the batch workflow.
	// @group Batching
	//
	// Example: dispatch batch
	//
	//	batchID, _ := b.Batch(bus.NewJob("a", nil), bus.NewJob("b", nil)).Dispatch(context.Background())
	//	_ = batchID
	Dispatch(ctx context.Context) (string, error)
}

type BatchJob

type BatchJob struct {
	JobID string
	Job   wireJob
}

type BatchRecord

type BatchRecord struct {
	BatchID     string
	DispatchID  string
	Name        string
	Queue       string
	AllowFailed bool
	Jobs        []BatchJob
	CreatedAt   time.Time
}

type BatchSpec

type BatchSpec struct {
	JobTypes []string
}

type BatchState

type BatchState struct {
	BatchID     string
	DispatchID  string
	Name        string
	Queue       string
	AllowFailed bool
	Total       int
	Pending     int
	Processed   int
	Failed      int
	Cancelled   bool
	Completed   bool
	CreatedAt   time.Time
	UpdatedAt   time.Time
}

type Bus

type Bus interface {
	Register(jobType string, handler Handler)

	Dispatch(ctx context.Context, job Job) (DispatchResult, error)
	Chain(jobs ...Job) ChainBuilder
	Batch(jobs ...Job) BatchBuilder

	StartWorkers(ctx context.Context) error
	Shutdown(ctx context.Context) error

	FindBatch(ctx context.Context, batchID string) (BatchState, error)
	FindChain(ctx context.Context, chainID string) (ChainState, error)
	Prune(ctx context.Context, before time.Time) error
}

func New

func New(q any, opts ...Option) (Bus, error)

New creates a bus runtime using an in-memory orchestration store. @group Constructors

Example: new bus runtime

q, _ := queue.NewSync()
b, _ := bus.New(q)
b.Register("monitor:poll", func(context.Context, bus.Context) error { return nil })
_ = b.StartWorkers(context.Background())
defer b.Shutdown(context.Background())
type PollPayload struct {
	URL string `json:"url"`
}
_, _ = b.Dispatch(context.Background(), bus.NewJob("monitor:poll", PollPayload{
	URL: "https://goforj.dev/health",
}))

func NewWithStore

func NewWithStore(q any, store Store, opts ...Option) (Bus, error)

NewWithStore creates a bus runtime with a custom orchestration store. @group Constructors

Example: new bus with store

q, _ := queue.NewSync()
store := bus.NewMemoryStore()
b, _ := bus.NewWithStore(q, store)
_ = b

type ChainBuilder

type ChainBuilder interface {
	// OnQueue applies a default queue to chain jobs that do not set one.
	// @group Chaining
	//
	// Example: set chain queue
	//
	//	chainID, _ := b.Chain(
	//		bus.NewJob("a", nil),
	//		bus.NewJob("b", nil),
	//	).OnQueue("critical").Dispatch(context.Background())
	//	_ = chainID
	OnQueue(queue string) ChainBuilder
	// Catch registers a callback invoked when chain execution fails.
	// @group Chaining
	//
	// Example: chain catch callback
	//
	//	chainID, _ := b.Chain(bus.NewJob("a", nil)).
	//		Catch(func(context.Context, bus.ChainState, error) error { return nil }).
	//		Dispatch(context.Background())
	//	_ = chainID
	Catch(fn func(ctx context.Context, st ChainState, err error) error) ChainBuilder
	// Finally registers a callback invoked once when chain execution finishes.
	// @group Chaining
	//
	// Example: chain finally callback
	//
	//	chainID, _ := b.Chain(bus.NewJob("a", nil)).
	//		Finally(func(context.Context, bus.ChainState) error { return nil }).
	//		Dispatch(context.Background())
	//	_ = chainID
	Finally(fn func(ctx context.Context, st ChainState) error) ChainBuilder
	// Dispatch creates and starts the chain workflow.
	// @group Chaining
	//
	// Example: dispatch chain
	//
	//	chainID, _ := b.Chain(bus.NewJob("a", nil), bus.NewJob("b", nil)).Dispatch(context.Background())
	//	_ = chainID
	Dispatch(ctx context.Context) (string, error)
}

type ChainNode

type ChainNode struct {
	NodeID string
	Job    wireJob
}

type ChainRecord

type ChainRecord struct {
	ChainID    string
	DispatchID string
	Queue      string
	Nodes      []ChainNode
	CreatedAt  time.Time
}

type ChainState

type ChainState struct {
	ChainID    string
	DispatchID string
	Queue      string
	Nodes      []ChainNode
	NextIndex  int
	Completed  bool
	Failed     bool
	Failure    string
	CreatedAt  time.Time
	UpdatedAt  time.Time
}

type Context

type Context struct {
	SchemaVersion int
	DispatchID    string
	JobID         string
	ChainID       string
	BatchID       string
	Attempt       int
	JobType       string
	// contains filtered or unexported fields
}

func (Context) Bind

func (c Context) Bind(dst any) error

Bind unmarshals the job payload into dst. @group Job

Example: bind payload

type PollPayload struct {
	URL string `json:"url"`
}
var payload PollPayload
_ = jc.Bind(&payload)

func (Context) PayloadBytes

func (c Context) PayloadBytes() []byte

PayloadBytes returns a copy of raw job payload bytes. @group Job

Example: read raw payload bytes

raw := jc.PayloadBytes()
_ = raw

type DispatchResult

type DispatchResult struct {
	DispatchID string
}

type Event

type Event struct {
	SchemaVersion int
	EventID       string
	Kind          EventKind
	DispatchID    string
	JobID         string
	ChainID       string
	BatchID       string
	Attempt       int
	JobType       string
	Queue         string
	Duration      time.Duration
	Time          time.Time
	Err           error
}

type EventKind

type EventKind string
const (
	EventDispatchStarted   EventKind = "dispatch_started"
	EventDispatchSucceeded EventKind = "dispatch_succeeded"
	EventDispatchFailed    EventKind = "dispatch_failed"
	EventJobStarted        EventKind = "job_started"
	EventJobSucceeded      EventKind = "job_succeeded"
	EventJobFailed         EventKind = "job_failed"
	EventChainStarted      EventKind = "chain_started"
	EventChainAdvanced     EventKind = "chain_advanced"
	EventChainCompleted    EventKind = "chain_completed"
	EventChainFailed       EventKind = "chain_failed"
	EventBatchStarted      EventKind = "batch_started"
	EventBatchProgressed   EventKind = "batch_progressed"
	EventBatchCompleted    EventKind = "batch_completed"
	EventBatchFailed       EventKind = "batch_failed"
	EventBatchCancelled    EventKind = "batch_cancelled"
	EventCallbackStarted   EventKind = "callback_started"
	EventCallbackSucceeded EventKind = "callback_succeeded"
	EventCallbackFailed    EventKind = "callback_failed"
)

type FailOnError

type FailOnError struct {
	When func(err error) bool
}

func (FailOnError) Handle

func (f FailOnError) Handle(ctx context.Context, jc Context, next Next) error

Handle wraps matched errors as fatal errors to stop retries. @group Middleware

Example: fail on any error

mw := bus.FailOnError{
	When: func(err error) bool { return err != nil },
}
_ = mw

type Fake

type Fake struct {
	// contains filtered or unexported fields
}

func NewFake

func NewFake() *Fake

NewFake creates a bus fake that records dispatch, chain, and batch calls. @group Constructors

Example: new bus fake

fake := bus.NewFake()
_, _ = fake.Dispatch(context.Background(), bus.NewJob("monitor:poll", nil))

func (*Fake) AssertBatchCount

func (f *Fake) AssertBatchCount(t testing.TB, n int)

AssertBatchCount fails if total recorded batch count does not match n. @group Testing

Example: assert batch count

fake := bus.NewFake()
_, _ = fake.Batch(bus.NewJob("a", nil)).Dispatch(context.Background())
fake.AssertBatchCount(nil, 1)

func (*Fake) AssertBatched

func (f *Fake) AssertBatched(t testing.TB, predicate func(spec BatchSpec) bool)

AssertBatched fails unless at least one recorded batch matches predicate. @group Testing

Example: assert batch predicate

fake := bus.NewFake()
_, _ = fake.Batch(bus.NewJob("a", nil), bus.NewJob("b", nil)).Dispatch(context.Background())
fake.AssertBatched(nil, func(spec bus.BatchSpec) bool { return len(spec.JobTypes) == 2 })

func (*Fake) AssertChained

func (f *Fake) AssertChained(t testing.TB, expected []string)

AssertChained fails if no recorded chain matches expected job type order. @group Testing

Example: assert chain

fake := bus.NewFake()
_, _ = fake.Chain(bus.NewJob("a", nil), bus.NewJob("b", nil)).Dispatch(context.Background())
fake.AssertChained(nil, []string{"a", "b"})

func (*Fake) AssertCount

func (f *Fake) AssertCount(t testing.TB, n int)

AssertCount fails if total dispatched count does not match n. @group Testing

Example: assert dispatch count

fake := bus.NewFake()
_, _ = fake.Dispatch(context.Background(), bus.NewJob("emails:send", nil))
fake.AssertCount(nil, 1)

func (*Fake) AssertDispatched

func (f *Fake) AssertDispatched(t testing.TB, jobType string)

AssertDispatched fails if the given job type was never dispatched. @group Testing

Example: assert dispatched

fake := bus.NewFake()
_, _ = fake.Dispatch(context.Background(), bus.NewJob("emails:send", nil))
fake.AssertDispatched(nil, "emails:send")

func (*Fake) AssertDispatchedOn

func (f *Fake) AssertDispatchedOn(t testing.TB, queueName, jobType string)

AssertDispatchedOn fails if a job type was not dispatched on queueName. @group Testing

Example: assert dispatched on queue

fake := bus.NewFake()
_, _ = fake.Dispatch(context.Background(), bus.NewJob("emails:send", nil).OnQueue("critical"))
fake.AssertDispatchedOn(nil, "critical", "emails:send")

func (*Fake) AssertDispatchedTimes

func (f *Fake) AssertDispatchedTimes(t testing.TB, jobType string, n int)

AssertDispatchedTimes fails if dispatched count for job type does not match n. @group Testing

Example: assert dispatch count by type

fake := bus.NewFake()
_, _ = fake.Dispatch(context.Background(), bus.NewJob("emails:send", nil))
_, _ = fake.Dispatch(context.Background(), bus.NewJob("emails:send", nil))
fake.AssertDispatchedTimes(nil, "emails:send", 2)

func (*Fake) AssertNotDispatched

func (f *Fake) AssertNotDispatched(t testing.TB, jobType string)

AssertNotDispatched fails if the given job type was dispatched. @group Testing

Example: assert not dispatched

fake := bus.NewFake()
fake.AssertNotDispatched(nil, "emails:send")

func (*Fake) AssertNothingBatched

func (f *Fake) AssertNothingBatched(t testing.TB)

AssertNothingBatched fails if any batch was recorded. @group Testing

Example: assert no batches

fake := bus.NewFake()
fake.AssertNothingBatched(nil)

func (*Fake) AssertNothingDispatched

func (f *Fake) AssertNothingDispatched(t testing.TB)

AssertNothingDispatched fails if any job was dispatched. @group Testing

Example: assert no dispatch

fake := bus.NewFake()
fake.AssertNothingDispatched(nil)

func (*Fake) Batch

func (f *Fake) Batch(jobs ...Job) BatchBuilder

Batch records a batch specification. @group Testing

Example: record batch

fake := bus.NewFake()
_, _ = fake.Batch(
	bus.NewJob("a", nil),
	bus.NewJob("b", nil),
).Dispatch(context.Background())

func (*Fake) Chain

func (f *Fake) Chain(jobs ...Job) ChainBuilder

Chain records a chain specification. @group Testing

Example: record chain

fake := bus.NewFake()
_, _ = fake.Chain(
	bus.NewJob("a", nil),
	bus.NewJob("b", nil),
).Dispatch(context.Background())

func (*Fake) Dispatch

func (f *Fake) Dispatch(_ context.Context, job Job) (DispatchResult, error)

Dispatch records a dispatched job. @group Testing

Example: record dispatch

fake := bus.NewFake()
_, _ = fake.Dispatch(context.Background(), bus.NewJob("emails:send", nil))

func (*Fake) FindBatch

func (f *Fake) FindBatch(context.Context, string) (BatchState, error)

func (*Fake) FindChain

func (f *Fake) FindChain(context.Context, string) (ChainState, error)

func (*Fake) Prune

func (f *Fake) Prune(context.Context, time.Time) error

func (*Fake) Register

func (f *Fake) Register(string, Handler)

func (*Fake) Shutdown

func (f *Fake) Shutdown(context.Context) error

func (*Fake) StartWorkers

func (f *Fake) StartWorkers(context.Context) error

type Handler

type Handler func(ctx context.Context, j Context) error

type Job

type Job struct {
	Type    string
	Payload any
	Options JobOptions
}

func NewJob

func NewJob(jobType string, payload any) Job

NewJob creates a typed bus job payload with optional fluent options. @group Constructors

Example: new bus job

type PollPayload struct {
	URL string `json:"url"`
}
job := bus.NewJob("monitor:poll", PollPayload{
	URL: "https://goforj.dev/health",
}).
	OnQueue("monitor-critical").
	Delay(2 * time.Second).
	Timeout(15 * time.Second).
	Retry(3).
	Backoff(500 * time.Millisecond).
	UniqueFor(30 * time.Second)
_ = job

func (Job) Backoff

func (j Job) Backoff(backoff time.Duration) Job

Backoff sets retry backoff for this job. @group Job

Example: set retry backoff

job := bus.NewJob("emails:send", nil).Backoff(500 * time.Millisecond)
_ = job

func (Job) Delay

func (j Job) Delay(delay time.Duration) Job

Delay defers job execution. @group Job

Example: set delay

job := bus.NewJob("emails:send", nil).Delay(2 * time.Second)
_ = job

func (Job) OnQueue

func (j Job) OnQueue(name string) Job

OnQueue sets the target queue for this job. @group Job

Example: set queue

job := bus.NewJob("emails:send", nil).OnQueue("critical")
_ = job

func (Job) Retry

func (j Job) Retry(max int) Job

Retry sets max retry attempts for this job. @group Job

Example: set retry count

job := bus.NewJob("emails:send", nil).Retry(5)
_ = job

func (Job) Timeout

func (j Job) Timeout(timeout time.Duration) Job

Timeout sets execution timeout for this job. @group Job

Example: set timeout

job := bus.NewJob("emails:send", nil).Timeout(15 * time.Second)
_ = job

func (Job) UniqueFor

func (j Job) UniqueFor(ttl time.Duration) Job

UniqueFor sets dedupe TTL for this job. @group Job

Example: set unique TTL

job := bus.NewJob("emails:send", nil).UniqueFor(30 * time.Second)
_ = job

type JobOptions

type JobOptions struct {
	Queue     string
	Delay     time.Duration
	Timeout   time.Duration
	Retry     int
	Backoff   time.Duration
	UniqueFor time.Duration
}

type Lock

type Lock interface {
	Release(ctx context.Context) error
}

type Locker

type Locker interface {
	Acquire(ctx context.Context, key string, ttl time.Duration) (Lock, bool, error)
}

type Middleware

type Middleware interface {
	Handle(ctx context.Context, jc Context, next Next) error
}

Middleware can intercept bus job execution. @group Middleware

type MiddlewareFunc

type MiddlewareFunc func(ctx context.Context, jc Context, next Next) error

MiddlewareFunc adapts a function to Middleware. @group Middleware

func (MiddlewareFunc) Handle

func (f MiddlewareFunc) Handle(ctx context.Context, jc Context, next Next) error

Handle calls the wrapped middleware function. @group Middleware

Example: middleware func

mw := bus.MiddlewareFunc(func(ctx context.Context, jc bus.Context, next bus.Next) error {
	return next(ctx, jc)
})
_ = mw

type Next

type Next func(ctx context.Context, jc Context) error

type Observer

type Observer interface {
	Observe(event Event)
}

func MultiObserver

func MultiObserver(observers ...Observer) Observer

MultiObserver fans out one event to multiple observers. @group Events

Example: fan out observers

observer := bus.MultiObserver(
	bus.ObserverFunc(func(event bus.Event) {}),
	bus.ObserverFunc(func(event bus.Event) {}),
)
observer.Observe(bus.Event{Kind: bus.EventDispatchStarted})

type ObserverFunc

type ObserverFunc func(event Event)

func (ObserverFunc) Observe

func (f ObserverFunc) Observe(event Event)

Observe calls the wrapped observer function. @group Events

Example: observer func

observer := bus.ObserverFunc(func(event bus.Event) {
	_ = event.Kind
})
observer.Observe(bus.Event{Kind: bus.EventDispatchStarted})

type Option

type Option func(*runtime)

func WithClock

func WithClock(clock func() time.Time) Option

WithClock overrides the runtime clock used for event/state timestamps. @group Options

Example: fixed clock

fixed := time.Date(2026, time.January, 1, 0, 0, 0, 0, time.UTC)
b, _ := bus.New(q, bus.WithClock(func() time.Time { return fixed }))
_ = b

func WithMiddleware

func WithMiddleware(middlewares ...Middleware) Option

WithMiddleware appends middleware to the runtime execution chain. @group Options

Example: add middleware

audit := bus.MiddlewareFunc(func(ctx context.Context, jc bus.Context, next bus.Next) error {
	return next(ctx, jc)
})
skipHealth := bus.SkipWhen{
	Predicate: func(_ context.Context, jc bus.Context) bool { return jc.JobType == "health:ping" },
}
fatalize := bus.FailOnError{
	When: func(err error) bool { return err != nil },
}
b, _ := bus.New(q, bus.WithMiddleware(audit, skipHealth, fatalize))
_ = b

func WithObserver

func WithObserver(observer Observer) Option

WithObserver installs an event observer for dispatch/job/chain/batch lifecycle hooks. @group Options

Example: attach observer

observer := bus.ObserverFunc(func(event bus.Event) {
	_ = event.Kind
})
b, _ := bus.New(q, bus.WithObserver(observer))
_ = b

func WithStore

func WithStore(store Store) Option

WithStore overrides the orchestration store used for chain/batch/callback state. @group Options

Example: custom store

store := bus.NewMemoryStore()
b, _ := bus.New(q, bus.WithStore(store))
_ = b

type RateLimit

type RateLimit struct {
	Key     func(ctx context.Context, jc Context) string
	Limiter RateLimiter
}

func (RateLimit) Handle

func (r RateLimit) Handle(ctx context.Context, jc Context, next Next) error

Handle applies limiter checks before executing the next handler. @group Middleware

Example: rate limit middleware

mw := bus.RateLimit{
	Key: func(context.Context, bus.Context) string { return "emails" },
}
_ = mw

type RateLimiter

type RateLimiter interface {
	Allow(ctx context.Context, key string) (allowed bool, retryAfter time.Duration, err error)
}

type RetryPolicy

type RetryPolicy struct{}

func (RetryPolicy) Handle

func (RetryPolicy) Handle(ctx context.Context, jc Context, next Next) error

Handle passes execution through without modification. @group Middleware

Example: retry policy passthrough

policy := bus.RetryPolicy{}
_ = policy

type SQLStoreConfig

type SQLStoreConfig struct {
	DB          *sql.DB
	DriverName  string
	DSN         string
	AutoMigrate bool
}

type SkipWhen

type SkipWhen struct {
	Predicate func(ctx context.Context, jc Context) bool
}

func (SkipWhen) Handle

func (s SkipWhen) Handle(ctx context.Context, jc Context, next Next) error

Handle skips job execution when Predicate returns true. @group Middleware

Example: skip by predicate

mw := bus.SkipWhen{
	Predicate: func(context.Context, bus.Context) bool { return true },
}
_ = mw

type Store

type Store interface {
	CreateChain(ctx context.Context, rec ChainRecord) error
	AdvanceChain(ctx context.Context, chainID string, completedNode string) (next *ChainNode, done bool, err error)
	FailChain(ctx context.Context, chainID string, cause error) error
	GetChain(ctx context.Context, chainID string) (ChainState, error)

	CreateBatch(ctx context.Context, rec BatchRecord) error
	MarkBatchJobStarted(ctx context.Context, batchID, jobID string) error
	MarkBatchJobSucceeded(ctx context.Context, batchID, jobID string) (BatchState, bool, error)
	MarkBatchJobFailed(ctx context.Context, batchID, jobID string, cause error) (BatchState, bool, error)
	CancelBatch(ctx context.Context, batchID string) error
	GetBatch(ctx context.Context, batchID string) (BatchState, error)

	MarkCallbackInvoked(ctx context.Context, key string) (bool, error)
	Prune(ctx context.Context, before time.Time) error
}

func NewMemoryStore

func NewMemoryStore() Store

NewMemoryStore creates an in-memory orchestration store implementation. @group Constructors

Example: new memory store

store := bus.NewMemoryStore()
_ = store

func NewSQLStore

func NewSQLStore(cfg SQLStoreConfig) (Store, error)

NewSQLStore creates a SQL-backed orchestration store. @group Constructors

Example: new sql store

store, _ := bus.NewSQLStore(bus.SQLStoreConfig{
	DriverName: "sqlite",
	DSN:        "file:bus.db?_busy_timeout=5000",
})
_ = store

type WithoutOverlapping

type WithoutOverlapping struct {
	Key    func(ctx context.Context, jc Context) string
	TTL    time.Duration
	Locker Locker
}

func (WithoutOverlapping) Handle

func (w WithoutOverlapping) Handle(ctx context.Context, jc Context, next Next) error

Handle acquires a lock and prevents concurrent overlap for the same key. @group Middleware

Example: without overlapping

mw := bus.WithoutOverlapping{
	Key: func(context.Context, bus.Context) string { return "job-key" },
	TTL: 30 * time.Second,
}
_ = mw

Directories

Path Synopsis
driver

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL