Documentation
¶
Overview ¶
Package bus implements a publish-subscribe pattern on top of go-que.
Package bus implements a publish-subscribe pattern on top of go-que. It allows publishing messages to subjects which can be subscribed to by multiple queues. The subject matching follows NATS-style wildcards pattern.
Package bus implements a publish-subscribe pattern on top of go-que. It allows publishing messages to subjects which can be subscribed to by multiple queues. The subject matching follows NATS-style wildcards pattern: - '*' matches any single token in a subject (e.g., "foo.*.baz" matches "foo.bar.baz") - '>' matches one or more tokens at the end of a subject (e.g., "foo.>" matches "foo.bar" and "foo.bar.baz")
The package uses a pluggable architecture with the Dialect interface to support different database backends. A PostgreSQL implementation is provided in the pgbus subpackage.
Index ¶
- Constants
- Variables
- func NewRistrettoCache(config *ristretto.Config[string, []Subscription]) (*ristretto.Cache[string, []Subscription], error)
- func ToRegexPattern(pattern string) (string, error)
- func ValidatePattern(pattern string) error
- func ValidateSubject(subject string) error
- type Bus
- type BusImpl
- type BusOption
- type BusOptions
- type Cache
- type ConsumeOption
- type ConsumeOptions
- type Consumer
- type Dialect
- type DialectDecorator
- type Dispatch
- type ExecutionStatus
- type Handler
- type Header
- type Inbound
- type Message
- type Metadata
- type Outbound
- type PlanConfig
- type Queue
- type QueueImpl
- func (q *QueueImpl) StartConsumer(ctx context.Context, handler Handler, options ...ConsumeOption) (Consumer, error)
- func (q *QueueImpl) Subscribe(ctx context.Context, pattern string, opts ...SubscribeOption) (Subscription, error)
- func (q *QueueImpl) Subscriptions(ctx context.Context) ([]Subscription, error)
- type SubscribeOption
- type SubscribeOptions
- type Subscription
- type SubscriptionExecution
- type WorkerConfig
Constants ¶
const ( HeaderSubscriptionPattern = "Subscription-Pattern" // The subscription pattern that matched this message HeaderSubscriptionIdentifier = "Subscription-Identifier" // The subscription identifier that received this message )
Header constants for message metadata
const MaxPatternTokens = 16
MaxPatternTokens defines the maximum number of tokens allowed in a pattern
Variables ¶
var ( // ErrInvalidQueue indicates that the queue name is invalid. ErrInvalidQueue = errors.New("queue is invalid") // ErrInvalidMessage indicates that the message is invalid. ErrInvalidMessage = errors.New("message is invalid") // ErrInvalidSubject indicates that the subject is invalid. ErrInvalidSubject = errors.New("subject is invalid") // ErrInvalidPattern indicates that the pattern is invalid. ErrInvalidPattern = errors.New("pattern is invalid") // ErrSubscriptionNotFound is returned when no subscription is found. ErrSubscriptionNotFound = errors.New("subscription not found") // ErrOverlappingPatterns is returned when a queue has multiple patterns matching the same subject. ErrOverlappingPatterns = errors.New("queue has overlapping patterns which may cause unexpected behavior; only the first subscription will be triggered") )
Common errors returned by the bus package.
var DefaultMaxEnqueuePerBatch = 100
DefaultMaxEnqueuePerBatch defines the default maximum number of plans that can be enqueued in a single transaction.
var DefaultPlanConfigFactory = func() *PlanConfig { return &PlanConfig{ RetryPolicy: DefaultRetryPolicyFactory(), RunAtDelta: 0, UniqueLifecycle: que.Ignore, } }
DefaultPlanConfigFactory provides default settings for subscription jobs.
var DefaultRetryPolicyFactory = func() *que.RetryPolicy { return &que.RetryPolicy{ InitialInterval: 30 * time.Second, MaxInterval: 600 * time.Second, NextIntervalMultiplier: 2, IntervalRandomPercent: 33, MaxRetryCount: 3, } }
DefaultRetryPolicyFactory provides a default retry policy for published messages.
var DefaultRistrettoConfigFactory = func() *ristretto.Config[string, []Subscription] { return &ristretto.Config[string, []Subscription]{ NumCounters: 1e6, MaxCost: 1e5, BufferItems: 64, Cost: func(value []Subscription) int64 { return int64(len(value)) }, } }
DefaultRistrettoConfigFactory is a factory function that returns a default ristretto.Config[string, []Subscription].
var DefaultWorkerConfigFactory = func() *WorkerConfig { return &WorkerConfig{ MaxLockPerSecond: 5, MaxBufferJobsCount: 0, MaxPerformPerSecond: 1000, MaxConcurrentPerformCount: 200, ReconnectBackOff: quex.DefaultReconnectBackOffFactory(), } }
DefaultWorkerConfigFactory provides default settings for workers.
var FallbackUniqueID = func(msg *Outbound) string { return "_gobus_:" + xid.New().String() }
Functions ¶
func NewRistrettoCache ¶
func NewRistrettoCache(config *ristretto.Config[string, []Subscription]) (*ristretto.Cache[string, []Subscription], error)
NewRistrettoCache creates a new ristretto cache.
func ToRegexPattern ¶
ToRegexPattern converts a NATS-style wildcard pattern to a regex pattern that follows NATS specification for subject matching. It returns an error if the pattern contains invalid characters or structure.
func ValidatePattern ¶
ValidatePattern validates that a pattern follows NATS wildcard rules. It checks for:
- Empty patterns are not allowed
- Empty tokens (parts between dots) are not allowed
- Only lowercase alphanumeric characters, '_', and '-' are allowed in non-wildcard tokens
- Wildcards can be '*' (single token) or '>' (multiple tokens)
- The '>' wildcard can only appear at the end of a pattern
- Pattern cannot exceed MaxPatternTokens tokens
func ValidateSubject ¶
ValidateSubject validates that a subject follows NATS subject rules. It checks for empty subjects, invalid characters, subjects starting or ending with a dot, and empty tokens.
Types ¶
type Bus ¶
type Bus interface {
// Queue returns a queue with the specified name.
Queue(name string) Queue
// Publish sends outbound messages to all queues with subscriptions matching the subject.
// All messages are processed in a single transaction.
Publish(ctx context.Context, msgs ...*Outbound) (*Dispatch, error)
// BySubject returns all subscriptions with patterns matching the given subject.
BySubject(ctx context.Context, subject string) ([]Subscription, error)
// Close releases resources held by the Bus, including the default cache if one was created.
// It is safe to call Close multiple times.
Close() error
}
Bus provides publish-subscribe capabilities on top of go-que. It manages subject-queue mappings and handles subject pattern matching.
func New ¶
New creates a new Bus instance with the given dialect and options.
dialect is the database dialect used for storing subscriptions. Different database backends can be supported by implementing this interface. A PostgreSQL implementation is provided in the pgbus package.
By default, a Ristretto cache is created to improve subscription lookup performance. Use WithCache to provide a custom cache or disable caching (by passing nil). The caller should call Close() when the Bus is no longer needed to release resources.
type BusImpl ¶
type BusImpl struct {
// contains filtered or unexported fields
}
BusImpl is a generic implementation of the Bus interface.
func (*BusImpl) BySubject ¶
BySubject returns all subscriptions with patterns matching the given subject.
func (*BusImpl) Close ¶ added in v0.1.0
Close releases resources held by the Bus, including the default cache if one was created. It is safe to call Close multiple times.
func (*BusImpl) Publish ¶
Publish sends outbound messages to all queues with subscriptions matching the subject. All messages are processed in a single transaction.
If a transaction is provided via bussql.NewContext in the context, Publish will use that transaction (with a savepoint if enabled) instead of creating its own. This is useful when you want to publish messages as part of a larger transaction that includes other database operations.
type BusOption ¶
type BusOption func(*BusOptions)
func WithCache ¶ added in v0.1.0
WithCache sets a custom cache for subscription lookups. If cache is nil, caching will be disabled. The caller is responsible for closing the provided cache.
func WithDialectDecorator ¶
func WithDialectDecorator(decorators ...DialectDecorator) BusOption
WithDialectDecorator adds a decorator to the dialect. Multiple decorators can be composed together and will be applied in the order provided. If any decorator returns an error during application, the error will be returned from New.
func WithLogger ¶
WithLogger sets the logger for the Bus implementation.
func WithMaxEnqueuePerBatch ¶
WithMaxEnqueuePerBatch sets the maximum number of plans that can be enqueued in a single transaction. If less than or equal to 0, DefaultMaxEnqueuePerBatch will be used.
func WithMigrate ¶
WithMigrate sets whether database migrations should be run during initialization.
func WithoutCache ¶ added in v0.1.0
func WithoutCache() BusOption
WithoutCache disables the default cache. This is equivalent to WithCache(nil).
type BusOptions ¶
type BusOptions struct {
// Migrate controls whether database migrations are run during initialization.
// Default is true.
Migrate bool
// Logger is used for logging warnings and errors. If nil, a default logger will be used.
Logger *slog.Logger
// MaxEnqueuePerBatch limits the maximum number of plans that can be enqueued in a single transaction.
// If the number of plans exceeds this limit, they will be split into multiple transactions.
// If less than or equal to 0, DefaultMaxEnqueuePerBatch will be used.
MaxEnqueuePerBatch int
// DialectDecorator provides a way to decorate the base dialect implementation with additional
// functionality such as caching, metrics, or logging. If nil, the dialect is used as-is.
// This is applied after the dialect is created but before any operations are performed.
DialectDecorator DialectDecorator
// CacheEnabled controls whether caching is enabled. Default is true.
// When enabled and Cache is nil, a default Ristretto cache will be created.
// Set to false to disable caching entirely.
CacheEnabled bool
// Cache is a custom cache instance to use for subscription lookups.
// If nil and CacheEnabled is true, a default cache will be created.
// The caller is responsible for closing this cache if provided.
Cache Cache
}
BusOptions configures the Bus implementation.
type Cache ¶
type Cache interface {
Get(key string) (value []Subscription, ok bool)
Set(key string, value []Subscription)
Del(key string)
Clear()
}
Cache is a simple cache interface for caching subscription lookups.
func WrapRistrettoCache ¶ added in v0.1.0
func WrapRistrettoCache(cache *ristretto.Cache[string, []Subscription]) Cache
WrapRistrettoCache wraps a ristretto cache into a Cache interface. This is useful when you want to use WithCache with a ristretto cache.
type ConsumeOption ¶
type ConsumeOption func(*ConsumeOptions)
ConsumeOption represents an option for customizing a worker.
func WithWorkerConfig ¶
func WithWorkerConfig(config *WorkerConfig) ConsumeOption
WithWorkerConfig sets the worker configuration for a worker.
type ConsumeOptions ¶
type ConsumeOptions struct {
// WorkerConfig contains the performance-related settings for a worker.
WorkerConfig *WorkerConfig
}
ConsumeOptions holds all the options for creating a worker.
type Consumer ¶
type Consumer interface {
quex.WorkerController
}
Consumer represents a message consumer that can be stopped.
type Dialect ¶
type Dialect interface {
// Migrate performs database migrations to initialize required tables.
Migrate(ctx context.Context) error
// GoQue returns the underlying GoQue instance.
GoQue() que.Queue
// GetMetadata retrieves the current bus metadata.
GetMetadata(ctx context.Context) (*Metadata, error)
// BySubject finds all subscriptions with patterns matching the given subject.
BySubject(ctx context.Context, subject string) ([]Subscription, error)
// ByQueue returns all subscriptions for a specific queue.
ByQueue(ctx context.Context, queue string) ([]Subscription, error)
// Upsert creates or updates a subscription with the provided options.
Upsert(ctx context.Context, queue, pattern string, opts *SubscribeOptions) (Subscription, error)
// ExecTx executes fn within a database transaction.
// If the context already contains a transaction (via bussql.NewContext), that transaction
// is reused (with a savepoint if enabled). Otherwise, a new transaction is started.
// The transaction is automatically committed if fn returns nil, or rolled back on error.
ExecTx(ctx context.Context, fn func(ctx context.Context, tx *sql.Tx) error, opts ...bussql.TransactionOption) error
}
Dialect defines the interface for database-specific implementations of the message bus. It abstracts storage operations and allows for different backend databases to be used.
type DialectDecorator ¶
DialectDecorator is a function that decorates a Dialect with additional functionality. It can return an error if the decoration process fails.
func CacheDecorator ¶
func CacheDecorator(cache Cache) DialectDecorator
CacheDecorator creates a decorator that adds caching functionality to the dialect. This significantly improves performance for subscription lookups. The decorator automatically handles TTL-based subscription expiration by checking cached subscriptions' ExpiresAt() values and refreshing from database when needed. The caller is responsible for creating and closing the cache instance.
func RistrettoDecorator ¶
func RistrettoDecorator(cache *ristretto.Cache[string, []Subscription]) DialectDecorator
RistrettoDecorator creates a decorator that adds caching functionality to the dialect.
type Dispatch ¶
type Dispatch struct {
// Executions contains detailed execution information for all matched subscriptions.
Executions []*SubscriptionExecution
}
Dispatch represents the result of a publish or dispatch operation.
func (*Dispatch) ExecutedCount ¶
ExecutedCount returns the number of subscriptions that were successfully executed.
func (*Dispatch) JobIDs ¶
JobIDs returns all successfully executed job IDs for backward compatibility.
func (*Dispatch) MatchedCount ¶
MatchedCount returns the total number of matched subscriptions.
func (*Dispatch) SkippedByConflict ¶
func (d *Dispatch) SkippedByConflict() []*SubscriptionExecution
SkippedByConflict returns subscriptions that were skipped due to unique constraint conflicts.
func (*Dispatch) SkippedByOverlap ¶
func (d *Dispatch) SkippedByOverlap() []*SubscriptionExecution
SkippedByOverlap returns subscriptions that were skipped due to overlapping patterns.
type ExecutionStatus ¶
type ExecutionStatus string
ExecutionStatus represents the execution status of a subscription.
const ( // ExecutionStatusExecuted indicates the subscription was successfully executed and a job was created. ExecutionStatusExecuted ExecutionStatus = "EXECUTED" // ExecutionStatusSkippedOverlap indicates the subscription was skipped due to overlapping patterns in the same queue. ExecutionStatusSkippedOverlap ExecutionStatus = "SKIPPED_OVERLAP" // ExecutionStatusSkippedConflict indicates the subscription was skipped due to unique constraint conflict (que.SkippedID). ExecutionStatusSkippedConflict ExecutionStatus = "SKIPPED_CONFLICT" )
type Inbound ¶
type Inbound struct {
// Message is the message content.
Message
// Payload is the raw JSON payload of the message.
//
// When publishing (Outbound), the following payload types are supported:
// - Any Go value that is JSON-marshalable (e.g., structs, maps, slices, scalars)
// - json.RawMessage (used as-is, without additional marshaling)
// - []byte (encoded by encoding/json as a base64 JSON string; use json.RawMessage for raw JSON)
//
// When receiving (Inbound), InboundFromArgs currently assigns a json.RawMessage
// containing the raw JSON payload to this field. Handlers can therefore rely
// on msg.Payload being a json.RawMessage for inbound messages, and may type-assert
// accordingly or unmarshal it into a concrete type.
Payload json.RawMessage `json:"payload"`
// Job is the job that received the message.
que.Job `json:"-"`
}
func InboundFromArgs ¶
InboundFromArgs creates an Inbound message from raw arguments. This is primarily used for testing and debugging purposes.
type Message ¶
type Message struct {
// Subject is the topic this message is published to.
Subject string `json:"subject"`
// Header is the header of the message.
Header Header `json:"header"`
// Payload is the actual content of the message.
//
// When publishing (Outbound), the following payload types are supported:
// - Any Go value that is JSON-marshalable (e.g., structs, maps, slices, scalars)
// - json.RawMessage (used as-is, without additional marshaling)
// - []byte (encoded by encoding/json as a base64 JSON string; use json.RawMessage for raw JSON)
//
// When receiving (Inbound), InboundFromArgs assigns the raw JSON payload as a
// json.RawMessage to Inbound.Payload (i.e., msg.Payload on *Inbound). For
// convenience and backwards compatibility, this same raw payload is also
// populated into Message.Payload. Inbound handlers should primarily use
// Inbound.Payload/msg.Payload (on the Inbound value) as the source of raw JSON
// and unmarshal it into concrete types as needed.
Payload any `json:"payload"`
}
Message represents a message in the publish-subscribe system.
func (*Message) ToRaw ¶
func (m *Message) ToRaw(sub Subscription) (json.RawMessage, error)
type PlanConfig ¶
type PlanConfig struct {
// RetryPolicy defines how to retry failed job executions.
RetryPolicy *que.RetryPolicy `json:"retryPolicy"`
// RunAtDelta specifies the duration to delay job execution from the time of message receipt.
// Zero means execute immediately, positive values mean delayed execution.
RunAtDelta time.Duration `json:"runAtDelta"`
// UniqueLifecycle controls the uniqueness behavior of the job.
UniqueLifecycle que.UniqueLifecycle `json:"uniqueLifecycle"`
}
PlanConfig defines how a queue processes messages for a specific subject pattern.
func (*PlanConfig) Equal ¶
func (p *PlanConfig) Equal(other *PlanConfig) bool
Equal compares this PlanConfig with another and returns true if they are equivalent.
type Queue ¶
type Queue interface {
// Subscribe registers the queue to receive messages published to subjects matching the pattern.
// Pattern supports NATS-style wildcards: '*' for a single token, '>' for multiple trailing tokens.
Subscribe(ctx context.Context, pattern string, opts ...SubscribeOption) (Subscription, error)
// Subscriptions returns all subscriptions for the queue.
Subscriptions(ctx context.Context) ([]Subscription, error)
// StartConsumer starts a new message consumer for this queue.
// The ctx parameter is only used to manage the startup process, not the Consumer's lifecycle.
// The returned Consumer must be stopped by the caller when no longer needed.
StartConsumer(ctx context.Context, handler Handler, opts ...ConsumeOption) (Consumer, error)
}
Queue represents a message queue that can subscribe to subjects.
type QueueImpl ¶
type QueueImpl struct {
// contains filtered or unexported fields
}
QueueImpl implements the Queue interface.
func (*QueueImpl) StartConsumer ¶
func (q *QueueImpl) StartConsumer(ctx context.Context, handler Handler, options ...ConsumeOption) (Consumer, error)
StartConsumer starts a new message consumer for this queue. The returned Consumer must be stopped by the caller when no longer needed. The ctx parameter is only used to manage the startup process, not the Consumer's lifecycle.
func (*QueueImpl) Subscribe ¶
func (q *QueueImpl) Subscribe(ctx context.Context, pattern string, opts ...SubscribeOption) (Subscription, error)
Subscribe registers the queue to receive messages published to subjects matching the pattern.
func (*QueueImpl) Subscriptions ¶
func (q *QueueImpl) Subscriptions(ctx context.Context) ([]Subscription, error)
Subscriptions returns all subscriptions for the queue.
type SubscribeOption ¶
type SubscribeOption func(*SubscribeOptions)
SubscribeOption represents an option for customizing a subscription.
func WithAutoDrain ¶
func WithAutoDrain(autoDrain bool) SubscribeOption
WithAutoDrain sets whether to automatically execute Drain() when Unsubscribe() is called. When enabled, all pending jobs for the subscription will be cleaned up upon unsubscription.
func WithPlanConfig ¶
func WithPlanConfig(config *PlanConfig) SubscribeOption
WithPlanConfig sets the job configuration for a subscription.
func WithTTL ¶
func WithTTL(ttl time.Duration) SubscribeOption
WithTTL sets the TTL (Time To Live) for a subscription. The subscription will be automatically cleaned up if no heartbeat is received within this duration.
type SubscribeOptions ¶
type SubscribeOptions struct {
// PlanConfig contains the settings for job execution.
PlanConfig *PlanConfig `json:"planConfig"`
// TTL specifies how long the subscription should remain active without heartbeat.
// If set to zero or negative value, the subscription will never expire.
TTL time.Duration `json:"ttl"`
// AutoDrain indicates whether to automatically execute Drain() when Unsubscribe() is called.
AutoDrain bool `json:"autoDrain"`
}
SubscribeOptions contains the configuration for a subscription.
type Subscription ¶
type Subscription interface {
// ID returns the unique identifier of the subscription.
ID() string
// Queue returns the name of the queue that receives messages.
Queue() string
// Pattern returns the subject pattern this subscription matches against.
Pattern() string
// PlanConfig returns the plan configuration for this subscription.
PlanConfig() *PlanConfig
// Unsubscribe removes this subscription.
// If autoDrain was enabled in the subscription options, pending jobs will be automatically cleaned up.
// This method is usually executed when the subscription is not needed, and is not supposed to be executed with the exit of the program.
// This is because go-bus is designed to support offline messages.
Unsubscribe(ctx context.Context) error
// Heartbeat updates the heartbeat timestamp for this subscription.
// This method should be called periodically to prevent TTL-based cleanup.
Heartbeat(ctx context.Context) error
// ExpiresAt returns the expiration time for this subscription.
// Returns zero time if the subscription never expires (no TTL).
ExpiresAt() time.Time
// Drain removes all pending jobs that are not currently being processed.
// This method is useful for cleaning up the queue without affecting jobs that are already running.
// Returns the number of jobs that were deleted.
Drain(ctx context.Context) (int, error)
}
Subscription represents an active subscription to a subject pattern.
type SubscriptionExecution ¶
type SubscriptionExecution struct {
// Subscription contains the matched subscription information.
Subscription Subscription
// Status indicates the execution status.
Status ExecutionStatus
// Plan contains the plan used for this subscription.
Plan *que.Plan
// JobID contains the created job ID when Status is ExecutionStatusExecuted.
// For all skipped statuses, this will be nil.
JobID *int64
}
SubscriptionExecution represents the execution details of a single subscription.
type WorkerConfig ¶
type WorkerConfig struct {
// MaxLockPerSecond is maximum frequency of calls to Lock() method of Queue.
// Lower number uses lower database CPU resources.
MaxLockPerSecond float64
// MaxBufferJobsCount is maximum number of jobs in channel that are waiting for
// a goroutine to execute them.
MaxBufferJobsCount int
// MaxPerformPerSecond is maximum frequency of Perform executions.
MaxPerformPerSecond float64
// MaxConcurrentPerformCount is maximum number of goroutines executing Perform simultaneously.
MaxConcurrentPerformCount int
// ReconnectBackOff is the backoff strategy for reconnecting to the database.
ReconnectBackOff backoff.BackOff
}
WorkerConfig defines performance-related configuration for workers processing messages.