bus

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 19, 2026 License: MIT Imports: 21 Imported by: 1

README

go-bus

中文版本

A simple and reliable PostgreSQL-based publish/subscribe message bus system for Go applications. Built on top of tnclong/go-que.

Features

Flexible topic subscription patterns: Support for NATS-style topic matching with exact matching, single-level wildcards (*), and multi-level wildcards (>)
Persistent message queue: PostgreSQL-based storage ensures reliable message delivery
Multiple queue support: Multiple queues can subscribe to the same topic patterns
Custom retry strategies: Each subscription can configure its own message processing retry strategy
Message header support: Support for custom message metadata
Context propagation: Full integration with Go's context package

Installation

go get github.com/qor5/go-bus

Quick Start

Creating a Bus Instance
import (
    "database/sql"
    "github.com/qor5/go-bus/pgbus"
    _ "github.com/jackc/pgx/v5/stdlib"
)

// Connect to PostgreSQL
db, err := sql.Open("postgres", "postgres://user:password@localhost/dbname?sslmode=disable")
if err != nil {
    log.Fatalf("Failed to connect to database: %v", err)
}

// Create a new bus instance
bus, err := pgbus.New(db)
if err != nil {
    log.Fatalf("Failed to create bus: %v", err)
}
defer bus.Close() // Always close the bus to release resources
Consuming Messages
// Get a queue
queue := bus.Queue("my_service_queue")

// Basic consumption
consumer, err := queue.StartConsumer(ctx, func(ctx context.Context, msg *bus.Inbound) error {
    fmt.Printf("Received message: subject=%s, payload=%s\n", msg.Subject, string(msg.Payload))

    // Reading headers
    if contentType := msg.Header.Get("Content-Type"); contentType != "" {
        fmt.Printf("Content-Type: %s\n", contentType)
    }

    // Mark message as done after processing
    return msg.Done(ctx)
})
if err != nil {
    log.Fatalf("Failed to start consumer: %v", err)
}
// Ensure consumer is stopped when done
defer consumer.Stop(context.Background())

// Consumption with custom worker configuration
workerConfig := bus.WorkerConfig{
    MaxLockPerSecond:          5,
    MaxBufferJobsCount:        10,
    MaxPerformPerSecond:       5,
    MaxConcurrentPerformCount: 2,
}

consumer, err := queue.StartConsumer(ctx, func(ctx context.Context, msg *bus.Inbound) error {
    // Process message...
    fmt.Printf("Processing message: %s\n", string(msg.Payload))

    // If you want to discard the message, use Destroy instead of Done
    return msg.Destroy(ctx)
}, bus.WithWorkerConfig(workerConfig))
if err != nil {
    log.Fatalf("Failed to start consumer with options: %v", err)
}
defer consumer.Stop(context.Background())
Creating Subscriptions
// Create subscriptions - supporting various patterns
exactSub, err := queue.Subscribe(ctx, "orders.created")                // Exact match
wildcardSub, err := queue.Subscribe(ctx, "products.*.category.*.info") // Single-level wildcard at multiple positions
multiLevelSub, err := queue.Subscribe(ctx, "notifications.>")          // Multi-level wildcard

// Subscription with custom configuration
customPlan := bus.PlanConfig{
    RunAtDelta: 200 * time.Millisecond,
    RetryPolicy: &que.RetryPolicy{
        InitialInterval:        2 * time.Second,
        MaxInterval:            20 * time.Second,
        NextIntervalMultiplier: 2.0,
        IntervalRandomPercent:  20,
        MaxRetryCount:          5,
    },
}

customSub, err := queue.Subscribe(ctx, "payments.processed", bus.WithPlanConfig(&customPlan))

// Unsubscribe from a specific subscription
// This method is usually executed when the subscription is not needed, and is not supposed to be executed with the exit of the program.
// This is because go-bus is designed to support offline messages.
err = customSub.Unsubscribe(ctx)
Publishing Messages
// Basic publish with a struct payload
type Order struct {
    ID    string  `json:"id"`
    Total float64 `json:"total"`
}

_, err = bus.Publish(ctx, &bus.Outbound{
    Message: bus.Message{
        Subject: "orders.created",
        Payload: Order{ID: "12345", Total: 99.99},
    },
})

// Publish with unique ID (for deduplication)
_, err = bus.Publish(ctx, &bus.Outbound{
    Message: bus.Message{
        Subject: "orders.created",
        Payload: Order{ID: "12345", Total: 99.99},
    },
    UniqueID: bus.UniqueID("order-12345"),
})

// Publish with headers
_, err = bus.Publish(ctx, &bus.Outbound{
    Message: bus.Message{
        Subject: "orders.created",
        Header: bus.Header{
            "Content-Type": []string{"application/json"},
            "X-Request-ID": []string{"req-123456"},
        },
        Payload: Order{ID: "12345", Total: 99.99},
    },
})

// Publish with an Outbound object
outbound := &bus.Outbound{
    Message: bus.Message{
        Subject: "orders.created",
        Header:  bus.Header{"Content-Type": []string{"application/json"}},
        Payload: Order{ID: "12345", Total: 99.99},
    },
    UniqueID: bus.UniqueID("order-12345"), // Optional unique ID for message deduplication
}
_, err = bus.Publish(ctx, outbound)

// Publish multiple messages at once
outbound1 := &bus.Outbound{
    Message: bus.Message{
        Subject: "orders.created",
        Payload: Order{ID: "12345", Total: 99.99},
    },
    UniqueID: bus.UniqueID("order-12345"),
}
outbound2 := &bus.Outbound{
    Message: bus.Message{
        Subject: "notifications.sent",
        Payload: json.RawMessage(`{"user_id":"user123","message":"Your order has been created"}`),
    },
    UniqueID: bus.UniqueID("notification-user123-order-created"),
}
// Publish supports publishing multiple outbound messages in a single call
_, err = bus.Publish(ctx, outbound1, outbound2)
Finding Matching Subscriptions
// Find subscriptions matching a specific subject
subs, err := bus.BySubject(ctx, "orders.created")
for _, sub := range subs {
    fmt.Printf("Queue %s matches with pattern %s\n", sub.Queue(), sub.Pattern())
}

// Get all subscriptions for a specific queue
queueSubs, err := queue.Subscriptions(ctx)
for _, sub := range queueSubs {
    fmt.Printf("Pattern: %s, ID: %s\n", sub.Pattern(), sub.ID())
}

Advanced Usage

Using Consumer-Specific Nonce in Queue Names for Distributed Broadcast Reception

In a distributed environment (such as Kubernetes), when you need to ensure that each instance in the cluster receives the same broadcast message, you can create queues with a consumer-specific nonce in their names for each instance. This way, each instance can independently receive the same message, achieving the broadcast effect.

import (
    "github.com/google/uuid"
    "github.com/qor5/go-bus/pgbus"
    "context"
)

// Create a unique queue name for each service instance (like a K8s Pod)
podQueueName := fmt.Sprintf("broadcast-receiver-%s", uuid.New().String())
podQueue := bus.Queue(podQueueName)

// Start consuming messages
consumer, err := podQueue.StartConsumer(ctx, func(ctx context.Context, msg *bus.Inbound) error {
    log.Printf("Instance %s received broadcast message: %s - %s",
        podQueueName, msg.Subject, string(msg.Payload))
    return msg.Destroy(ctx)
})
if err != nil {
    log.Printf("Failed to start consumer: %v", err)
}
defer consumer.Stop(context.Background())

// Subscribe to broadcast topics
// WithAutoDrain(true) ensures that all pending jobs will be automatically cleaned up
// when this subscription is unsubscribed, which is essential for temporary queues
sub, err := podQueue.Subscribe(ctx, "broadcast.events.>", bus.WithAutoDrain(true))
if err != nil {
    log.Fatalf("Failed to create broadcast subscription: %v", err)
}
defer func() {
    // Since podQueue is one-time, you should unsubscribe immediately when the program exits
    if err := sub.Unsubscribe(context.Background()); err != nil {
        log.Printf("Failed to unsubscribe: %v", err)
    }
}()

// Other service blocking logic

This pattern is particularly useful for:

  • Broadcasting configuration changes or system notifications to all service instances
  • Ensuring each instance in the cluster independently processes the same message, implementing a reliable broadcast mechanism
  • Implementing event-driven system-wide notifications in microservice architectures

Each instance creates a queue with a unique name, so each message is processed independently by each subscribed instance, achieving a true broadcast effect.

Topic Pattern Explanation

go-bus supports three types of topic matching patterns, following the NATS messaging system style:

  1. Exact Match: Matches the exact topic string

    • Example: orders.created only matches orders.created
  2. Single-Level Wildcard (*): Matches any string in a single level

    • Example: products.*.category.*.info matches products.xyz.category.abc.info and products.123.category.456.info, but not products.category.info or products.xyz.category.abc.def.info
  3. Multi-Level Wildcard (>): Matches one or more levels

    • Example: orders.> matches orders.created, orders.updated, and orders.items.created

Important Notes

Avoid Overlapping Subscription Patterns

Do not subscribe to potentially overlapping patterns in the same queue. When a message matches multiple subscriptions in a queue, the system will only use the configuration (such as retry strategy) from the earliest created subscription and ignore others.

Problem Example

Suppose you create these two subscriptions in the same queue:

// First created subscription - using default configuration
sub1, err := queue.Subscribe(ctx, "orders.>")

// Later created subscription - with custom retry strategy
customPlan := bus.PlanConfig{
    RetryPolicy: &que.RetryPolicy{
        MaxRetryCount: 10,
        // Other configurations...
    },
}
sub2, err := queue.Subscribe(ctx, "orders.created", bus.WithPlanConfig(&customPlan))

When publishing a message with the subject orders.created:

  • The message matches both patterns: orders.> and orders.created
  • The system will use the configuration from sub1 (the earlier created orders.> subscription)
  • The custom retry strategy in sub2 (MaxRetryCount: 10) will be ignored
Correct Approach

To avoid this issue, use different queues for potentially overlapping patterns:

// First queue handles general orders events
queue1 := bus.Queue("orders_general_queue")
sub1, err := queue1.Subscribe(ctx, "orders.>")

// Second queue specifically handles orders.created events with custom configuration
queue2 := bus.Queue("orders_created_queue")
customPlan := bus.PlanConfig{
    RetryPolicy: &que.RetryPolicy{
        MaxRetryCount: 10,
        // Other configurations...
    },
}
sub2, err := queue2.Subscribe(ctx, "orders.created", bus.WithPlanConfig(&customPlan))

This way, the two subscriptions will process messages independently in their respective queues, and each configuration will be effective.

License

This project is licensed under the MIT License.

Acknowledgments

This project is based on tnclong/go-que - a high-performance PostgreSQL backend job queue.

Documentation

Overview

Package bus implements a publish-subscribe pattern on top of go-que.

Package bus implements a publish-subscribe pattern on top of go-que. It allows publishing messages to subjects which can be subscribed to by multiple queues. The subject matching follows NATS-style wildcards pattern.

Package bus implements a publish-subscribe pattern on top of go-que. It allows publishing messages to subjects which can be subscribed to by multiple queues. The subject matching follows NATS-style wildcards pattern: - '*' matches any single token in a subject (e.g., "foo.*.baz" matches "foo.bar.baz") - '>' matches one or more tokens at the end of a subject (e.g., "foo.>" matches "foo.bar" and "foo.bar.baz")

The package uses a pluggable architecture with the Dialect interface to support different database backends. A PostgreSQL implementation is provided in the pgbus subpackage.

Index

Constants

View Source
const (
	HeaderSubscriptionPattern    = "Subscription-Pattern"    // The subscription pattern that matched this message
	HeaderSubscriptionIdentifier = "Subscription-Identifier" // The subscription identifier that received this message
)

Header constants for message metadata

View Source
const MaxPatternTokens = 16

MaxPatternTokens defines the maximum number of tokens allowed in a pattern

Variables

View Source
var (
	// ErrInvalidQueue indicates that the queue name is invalid.
	ErrInvalidQueue = errors.New("queue is invalid")

	// ErrInvalidMessage indicates that the message is invalid.
	ErrInvalidMessage = errors.New("message is invalid")

	// ErrInvalidSubject indicates that the subject is invalid.
	ErrInvalidSubject = errors.New("subject is invalid")

	// ErrInvalidPattern indicates that the pattern is invalid.
	ErrInvalidPattern = errors.New("pattern is invalid")

	// ErrSubscriptionNotFound is returned when no subscription is found.
	ErrSubscriptionNotFound = errors.New("subscription not found")

	// ErrOverlappingPatterns is returned when a queue has multiple patterns matching the same subject.
	ErrOverlappingPatterns = errors.New("queue has overlapping patterns which may cause unexpected behavior; only the first subscription will be triggered")
)

Common errors returned by the bus package.

View Source
var DefaultMaxEnqueuePerBatch = 100

DefaultMaxEnqueuePerBatch defines the default maximum number of plans that can be enqueued in a single transaction.

View Source
var DefaultPlanConfigFactory = func() *PlanConfig {
	return &PlanConfig{
		RetryPolicy:     DefaultRetryPolicyFactory(),
		RunAtDelta:      0,
		UniqueLifecycle: que.Ignore,
	}
}

DefaultPlanConfigFactory provides default settings for subscription jobs.

View Source
var DefaultRetryPolicyFactory = func() *que.RetryPolicy {
	return &que.RetryPolicy{
		InitialInterval:        30 * time.Second,
		MaxInterval:            600 * time.Second,
		NextIntervalMultiplier: 2,
		IntervalRandomPercent:  33,
		MaxRetryCount:          3,
	}
}

DefaultRetryPolicyFactory provides a default retry policy for published messages.

View Source
var DefaultRistrettoConfigFactory = func() *ristretto.Config[string, []Subscription] {
	return &ristretto.Config[string, []Subscription]{
		NumCounters: 1e6,
		MaxCost:     1e5,
		BufferItems: 64,
		Cost:        func(value []Subscription) int64 { return int64(len(value)) },
	}
}

DefaultRistrettoConfigFactory is a factory function that returns a default ristretto.Config[string, []Subscription].

View Source
var DefaultWorkerConfigFactory = func() *WorkerConfig {
	return &WorkerConfig{
		MaxLockPerSecond:          5,
		MaxBufferJobsCount:        0,
		MaxPerformPerSecond:       1000,
		MaxConcurrentPerformCount: 200,
		ReconnectBackOff:          quex.DefaultReconnectBackOffFactory(),
	}
}

DefaultWorkerConfigFactory provides default settings for workers.

View Source
var FallbackUniqueID = func(msg *Outbound) string {
	return "_gobus_:" + xid.New().String()
}
View Source
var UniqueID = func(v string) func(msg *Outbound) string {
	return func(_ *Outbound) string {
		return v
	}
}

Functions

func NewRistrettoCache

func NewRistrettoCache(config *ristretto.Config[string, []Subscription]) (*ristretto.Cache[string, []Subscription], error)

NewRistrettoCache creates a new ristretto cache.

func ToRegexPattern

func ToRegexPattern(pattern string) (string, error)

ToRegexPattern converts a NATS-style wildcard pattern to a regex pattern that follows NATS specification for subject matching. It returns an error if the pattern contains invalid characters or structure.

func ValidatePattern

func ValidatePattern(pattern string) error

ValidatePattern validates that a pattern follows NATS wildcard rules. It checks for:

  • Empty patterns are not allowed
  • Empty tokens (parts between dots) are not allowed
  • Only lowercase alphanumeric characters, '_', and '-' are allowed in non-wildcard tokens
  • Wildcards can be '*' (single token) or '>' (multiple tokens)
  • The '>' wildcard can only appear at the end of a pattern
  • Pattern cannot exceed MaxPatternTokens tokens

func ValidateSubject

func ValidateSubject(subject string) error

ValidateSubject validates that a subject follows NATS subject rules. It checks for empty subjects, invalid characters, subjects starting or ending with a dot, and empty tokens.

Types

type Bus

type Bus interface {
	// Queue returns a queue with the specified name.
	Queue(name string) Queue

	// Publish sends outbound messages to all queues with subscriptions matching the subject.
	// All messages are processed in a single transaction.
	Publish(ctx context.Context, msgs ...*Outbound) (*Dispatch, error)

	// BySubject returns all subscriptions with patterns matching the given subject.
	BySubject(ctx context.Context, subject string) ([]Subscription, error)

	// Close releases resources held by the Bus, including the default cache if one was created.
	// It is safe to call Close multiple times.
	Close() error
}

Bus provides publish-subscribe capabilities on top of go-que. It manages subject-queue mappings and handles subject pattern matching.

func New

func New(dialect Dialect, opts ...BusOption) (_ Bus, xerr error)

New creates a new Bus instance with the given dialect and options.

dialect is the database dialect used for storing subscriptions. Different database backends can be supported by implementing this interface. A PostgreSQL implementation is provided in the pgbus package.

By default, a Ristretto cache is created to improve subscription lookup performance. Use WithCache to provide a custom cache or disable caching (by passing nil). The caller should call Close() when the Bus is no longer needed to release resources.

type BusImpl

type BusImpl struct {
	// contains filtered or unexported fields
}

BusImpl is a generic implementation of the Bus interface.

func (*BusImpl) BySubject

func (b *BusImpl) BySubject(ctx context.Context, subject string) ([]Subscription, error)

BySubject returns all subscriptions with patterns matching the given subject.

func (*BusImpl) Close added in v0.1.0

func (b *BusImpl) Close() error

Close releases resources held by the Bus, including the default cache if one was created. It is safe to call Close multiple times.

func (*BusImpl) Publish

func (b *BusImpl) Publish(ctx context.Context, msgs ...*Outbound) (*Dispatch, error)

Publish sends outbound messages to all queues with subscriptions matching the subject. All messages are processed in a single transaction.

If a transaction is provided via bussql.NewContext in the context, Publish will use that transaction (with a savepoint if enabled) instead of creating its own. This is useful when you want to publish messages as part of a larger transaction that includes other database operations.

func (*BusImpl) Queue

func (b *BusImpl) Queue(name string) Queue

Queue returns a queue with the specified name.

type BusOption

type BusOption func(*BusOptions)

func WithCache added in v0.1.0

func WithCache(cache Cache) BusOption

WithCache sets a custom cache for subscription lookups. If cache is nil, caching will be disabled. The caller is responsible for closing the provided cache.

func WithDialectDecorator

func WithDialectDecorator(decorators ...DialectDecorator) BusOption

WithDialectDecorator adds a decorator to the dialect. Multiple decorators can be composed together and will be applied in the order provided. If any decorator returns an error during application, the error will be returned from New.

func WithLogger

func WithLogger(logger *slog.Logger) BusOption

WithLogger sets the logger for the Bus implementation.

func WithMaxEnqueuePerBatch

func WithMaxEnqueuePerBatch(max int) BusOption

WithMaxEnqueuePerBatch sets the maximum number of plans that can be enqueued in a single transaction. If less than or equal to 0, DefaultMaxEnqueuePerBatch will be used.

func WithMigrate

func WithMigrate(migrate bool) BusOption

WithMigrate sets whether database migrations should be run during initialization.

func WithoutCache added in v0.1.0

func WithoutCache() BusOption

WithoutCache disables the default cache. This is equivalent to WithCache(nil).

type BusOptions

type BusOptions struct {
	// Migrate controls whether database migrations are run during initialization.
	// Default is true.
	Migrate bool

	// Logger is used for logging warnings and errors. If nil, a default logger will be used.
	Logger *slog.Logger

	// MaxEnqueuePerBatch limits the maximum number of plans that can be enqueued in a single transaction.
	// If the number of plans exceeds this limit, they will be split into multiple transactions.
	// If less than or equal to 0, DefaultMaxEnqueuePerBatch will be used.
	MaxEnqueuePerBatch int

	// DialectDecorator provides a way to decorate the base dialect implementation with additional
	// functionality such as caching, metrics, or logging. If nil, the dialect is used as-is.
	// This is applied after the dialect is created but before any operations are performed.
	DialectDecorator DialectDecorator

	// CacheEnabled controls whether caching is enabled. Default is true.
	// When enabled and Cache is nil, a default Ristretto cache will be created.
	// Set to false to disable caching entirely.
	CacheEnabled bool

	// Cache is a custom cache instance to use for subscription lookups.
	// If nil and CacheEnabled is true, a default cache will be created.
	// The caller is responsible for closing this cache if provided.
	Cache Cache
}

BusOptions configures the Bus implementation.

type Cache

type Cache interface {
	Get(key string) (value []Subscription, ok bool)
	Set(key string, value []Subscription)
	Del(key string)
	Clear()
}

Cache is a simple cache interface for caching subscription lookups.

func WrapRistrettoCache added in v0.1.0

func WrapRistrettoCache(cache *ristretto.Cache[string, []Subscription]) Cache

WrapRistrettoCache wraps a ristretto cache into a Cache interface. This is useful when you want to use WithCache with a ristretto cache.

type ConsumeOption

type ConsumeOption func(*ConsumeOptions)

ConsumeOption represents an option for customizing a worker.

func WithWorkerConfig

func WithWorkerConfig(config *WorkerConfig) ConsumeOption

WithWorkerConfig sets the worker configuration for a worker.

type ConsumeOptions

type ConsumeOptions struct {
	// WorkerConfig contains the performance-related settings for a worker.
	WorkerConfig *WorkerConfig
}

ConsumeOptions holds all the options for creating a worker.

type Consumer

type Consumer interface {
	quex.WorkerController
}

Consumer represents a message consumer that can be stopped.

type Dialect

type Dialect interface {
	// Migrate performs database migrations to initialize required tables.
	Migrate(ctx context.Context) error

	// GoQue returns the underlying GoQue instance.
	GoQue() que.Queue

	// GetMetadata retrieves the current bus metadata.
	GetMetadata(ctx context.Context) (*Metadata, error)

	// BySubject finds all subscriptions with patterns matching the given subject.
	BySubject(ctx context.Context, subject string) ([]Subscription, error)

	// ByQueue returns all subscriptions for a specific queue.
	ByQueue(ctx context.Context, queue string) ([]Subscription, error)

	// Upsert creates or updates a subscription with the provided options.
	Upsert(ctx context.Context, queue, pattern string, opts *SubscribeOptions) (Subscription, error)

	// ExecTx executes fn within a database transaction.
	// If the context already contains a transaction (via bussql.NewContext), that transaction
	// is reused (with a savepoint if enabled). Otherwise, a new transaction is started.
	// The transaction is automatically committed if fn returns nil, or rolled back on error.
	ExecTx(ctx context.Context, fn func(ctx context.Context, tx *sql.Tx) error, opts ...bussql.TransactionOption) error
}

Dialect defines the interface for database-specific implementations of the message bus. It abstracts storage operations and allows for different backend databases to be used.

type DialectDecorator

type DialectDecorator func(ctx context.Context, next Dialect) (Dialect, error)

DialectDecorator is a function that decorates a Dialect with additional functionality. It can return an error if the decoration process fails.

func CacheDecorator

func CacheDecorator(cache Cache) DialectDecorator

CacheDecorator creates a decorator that adds caching functionality to the dialect. This significantly improves performance for subscription lookups. The decorator automatically handles TTL-based subscription expiration by checking cached subscriptions' ExpiresAt() values and refreshing from database when needed. The caller is responsible for creating and closing the cache instance.

func RistrettoDecorator

func RistrettoDecorator(cache *ristretto.Cache[string, []Subscription]) DialectDecorator

RistrettoDecorator creates a decorator that adds caching functionality to the dialect.

type Dispatch

type Dispatch struct {
	// Executions contains detailed execution information for all matched subscriptions.
	Executions []*SubscriptionExecution
}

Dispatch represents the result of a publish or dispatch operation.

func (*Dispatch) ExecutedCount

func (d *Dispatch) ExecutedCount() int

ExecutedCount returns the number of subscriptions that were successfully executed.

func (*Dispatch) JobIDs

func (d *Dispatch) JobIDs() []int64

JobIDs returns all successfully executed job IDs for backward compatibility.

func (*Dispatch) MatchedCount

func (d *Dispatch) MatchedCount() int

MatchedCount returns the total number of matched subscriptions.

func (*Dispatch) SkippedByConflict

func (d *Dispatch) SkippedByConflict() []*SubscriptionExecution

SkippedByConflict returns subscriptions that were skipped due to unique constraint conflicts.

func (*Dispatch) SkippedByOverlap

func (d *Dispatch) SkippedByOverlap() []*SubscriptionExecution

SkippedByOverlap returns subscriptions that were skipped due to overlapping patterns.

type ExecutionStatus

type ExecutionStatus string

ExecutionStatus represents the execution status of a subscription.

const (
	// ExecutionStatusExecuted indicates the subscription was successfully executed and a job was created.
	ExecutionStatusExecuted ExecutionStatus = "EXECUTED"

	// ExecutionStatusSkippedOverlap indicates the subscription was skipped due to overlapping patterns in the same queue.
	ExecutionStatusSkippedOverlap ExecutionStatus = "SKIPPED_OVERLAP"

	// ExecutionStatusSkippedConflict indicates the subscription was skipped due to unique constraint conflict (que.SkippedID).
	ExecutionStatusSkippedConflict ExecutionStatus = "SKIPPED_CONFLICT"
)

type Handler

type Handler func(ctx context.Context, msg *Inbound) error

Handler represents a function that processes messages.

type Header = http.Header

type Inbound

type Inbound struct {
	// Message is the message content.
	Message

	// Payload is the raw JSON payload of the message.
	//
	// When publishing (Outbound), the following payload types are supported:
	//   - Any Go value that is JSON-marshalable (e.g., structs, maps, slices, scalars)
	//   - json.RawMessage (used as-is, without additional marshaling)
	//   - []byte (encoded by encoding/json as a base64 JSON string; use json.RawMessage for raw JSON)
	//
	// When receiving (Inbound), InboundFromArgs currently assigns a json.RawMessage
	// containing the raw JSON payload to this field. Handlers can therefore rely
	// on msg.Payload being a json.RawMessage for inbound messages, and may type-assert
	// accordingly or unmarshal it into a concrete type.
	Payload json.RawMessage `json:"payload"`

	// Job is the job that received the message.
	que.Job `json:"-"`
}

func InboundFromArgs

func InboundFromArgs(args []byte) (*Inbound, error)

InboundFromArgs creates an Inbound message from raw arguments. This is primarily used for testing and debugging purposes.

func InboundFromJob

func InboundFromJob(job que.Job) (*Inbound, error)

InboundFromJob creates an Inbound message from a que.Job. This is the primary method used in production message processing.

type Message

type Message struct {
	// Subject is the topic this message is published to.
	Subject string `json:"subject"`

	// Header is the header of the message.
	Header Header `json:"header"`

	// Payload is the actual content of the message.
	//
	// When publishing (Outbound), the following payload types are supported:
	//   - Any Go value that is JSON-marshalable (e.g., structs, maps, slices, scalars)
	//   - json.RawMessage (used as-is, without additional marshaling)
	//   - []byte (encoded by encoding/json as a base64 JSON string; use json.RawMessage for raw JSON)
	//
	// When receiving (Inbound), InboundFromArgs assigns the raw JSON payload as a
	// json.RawMessage to Inbound.Payload (i.e., msg.Payload on *Inbound). For
	// convenience and backwards compatibility, this same raw payload is also
	// populated into Message.Payload. Inbound handlers should primarily use
	// Inbound.Payload/msg.Payload (on the Inbound value) as the source of raw JSON
	// and unmarshal it into concrete types as needed.
	Payload any `json:"payload"`
}

Message represents a message in the publish-subscribe system.

func (*Message) ToRaw

func (m *Message) ToRaw(sub Subscription) (json.RawMessage, error)

type Metadata

type Metadata struct {
	Version            int64
	UpdatedAt          time.Time
	TotalSubscriptions int64
}

type Outbound

type Outbound struct {
	// Message is the message content.
	Message

	// UniqueID is the unique identifier for this message.
	// If set, it will be used for message deduplication.
	UniqueID func(msg *Outbound) string `json:"-"`
}

type PlanConfig

type PlanConfig struct {
	// RetryPolicy defines how to retry failed job executions.
	RetryPolicy *que.RetryPolicy `json:"retryPolicy"`

	// RunAtDelta specifies the duration to delay job execution from the time of message receipt.
	// Zero means execute immediately, positive values mean delayed execution.
	RunAtDelta time.Duration `json:"runAtDelta"`

	// UniqueLifecycle controls the uniqueness behavior of the job.
	UniqueLifecycle que.UniqueLifecycle `json:"uniqueLifecycle"`
}

PlanConfig defines how a queue processes messages for a specific subject pattern.

func (*PlanConfig) Equal

func (p *PlanConfig) Equal(other *PlanConfig) bool

Equal compares this PlanConfig with another and returns true if they are equivalent.

type Queue

type Queue interface {
	// Subscribe registers the queue to receive messages published to subjects matching the pattern.
	// Pattern supports NATS-style wildcards: '*' for a single token, '>' for multiple trailing tokens.
	Subscribe(ctx context.Context, pattern string, opts ...SubscribeOption) (Subscription, error)

	// Subscriptions returns all subscriptions for the queue.
	Subscriptions(ctx context.Context) ([]Subscription, error)

	// StartConsumer starts a new message consumer for this queue.
	// The ctx parameter is only used to manage the startup process, not the Consumer's lifecycle.
	// The returned Consumer must be stopped by the caller when no longer needed.
	StartConsumer(ctx context.Context, handler Handler, opts ...ConsumeOption) (Consumer, error)
}

Queue represents a message queue that can subscribe to subjects.

type QueueImpl

type QueueImpl struct {
	// contains filtered or unexported fields
}

QueueImpl implements the Queue interface.

func (*QueueImpl) StartConsumer

func (q *QueueImpl) StartConsumer(ctx context.Context, handler Handler, options ...ConsumeOption) (Consumer, error)

StartConsumer starts a new message consumer for this queue. The returned Consumer must be stopped by the caller when no longer needed. The ctx parameter is only used to manage the startup process, not the Consumer's lifecycle.

func (*QueueImpl) Subscribe

func (q *QueueImpl) Subscribe(ctx context.Context, pattern string, opts ...SubscribeOption) (Subscription, error)

Subscribe registers the queue to receive messages published to subjects matching the pattern.

func (*QueueImpl) Subscriptions

func (q *QueueImpl) Subscriptions(ctx context.Context) ([]Subscription, error)

Subscriptions returns all subscriptions for the queue.

type SubscribeOption

type SubscribeOption func(*SubscribeOptions)

SubscribeOption represents an option for customizing a subscription.

func WithAutoDrain

func WithAutoDrain(autoDrain bool) SubscribeOption

WithAutoDrain sets whether to automatically execute Drain() when Unsubscribe() is called. When enabled, all pending jobs for the subscription will be cleaned up upon unsubscription.

func WithPlanConfig

func WithPlanConfig(config *PlanConfig) SubscribeOption

WithPlanConfig sets the job configuration for a subscription.

func WithTTL

func WithTTL(ttl time.Duration) SubscribeOption

WithTTL sets the TTL (Time To Live) for a subscription. The subscription will be automatically cleaned up if no heartbeat is received within this duration.

type SubscribeOptions

type SubscribeOptions struct {
	// PlanConfig contains the settings for job execution.
	PlanConfig *PlanConfig `json:"planConfig"`

	// TTL specifies how long the subscription should remain active without heartbeat.
	// If set to zero or negative value, the subscription will never expire.
	TTL time.Duration `json:"ttl"`

	// AutoDrain indicates whether to automatically execute Drain() when Unsubscribe() is called.
	AutoDrain bool `json:"autoDrain"`
}

SubscribeOptions contains the configuration for a subscription.

type Subscription

type Subscription interface {
	// ID returns the unique identifier of the subscription.
	ID() string

	// Queue returns the name of the queue that receives messages.
	Queue() string

	// Pattern returns the subject pattern this subscription matches against.
	Pattern() string

	// PlanConfig returns the plan configuration for this subscription.
	PlanConfig() *PlanConfig

	// Unsubscribe removes this subscription.
	// If autoDrain was enabled in the subscription options, pending jobs will be automatically cleaned up.
	// This method is usually executed when the subscription is not needed, and is not supposed to be executed with the exit of the program.
	// This is because go-bus is designed to support offline messages.
	Unsubscribe(ctx context.Context) error

	// Heartbeat updates the heartbeat timestamp for this subscription.
	// This method should be called periodically to prevent TTL-based cleanup.
	Heartbeat(ctx context.Context) error

	// ExpiresAt returns the expiration time for this subscription.
	// Returns zero time if the subscription never expires (no TTL).
	ExpiresAt() time.Time

	// Drain removes all pending jobs that are not currently being processed.
	// This method is useful for cleaning up the queue without affecting jobs that are already running.
	// Returns the number of jobs that were deleted.
	Drain(ctx context.Context) (int, error)
}

Subscription represents an active subscription to a subject pattern.

type SubscriptionExecution

type SubscriptionExecution struct {
	// Subscription contains the matched subscription information.
	Subscription Subscription

	// Status indicates the execution status.
	Status ExecutionStatus

	// Plan contains the plan used for this subscription.
	Plan *que.Plan

	// JobID contains the created job ID when Status is ExecutionStatusExecuted.
	// For all skipped statuses, this will be nil.
	JobID *int64
}

SubscriptionExecution represents the execution details of a single subscription.

type WorkerConfig

type WorkerConfig struct {
	// MaxLockPerSecond is maximum frequency of calls to Lock() method of Queue.
	// Lower number uses lower database CPU resources.
	MaxLockPerSecond float64

	// MaxBufferJobsCount is maximum number of jobs in channel that are waiting for
	// a goroutine to execute them.
	MaxBufferJobsCount int

	// MaxPerformPerSecond is maximum frequency of Perform executions.
	MaxPerformPerSecond float64

	// MaxConcurrentPerformCount is maximum number of goroutines executing Perform simultaneously.
	MaxConcurrentPerformCount int

	// ReconnectBackOff is the backoff strategy for reconnecting to the database.
	ReconnectBackOff backoff.BackOff
}

WorkerConfig defines performance-related configuration for workers processing messages.

Directories

Path Synopsis
Package bussql provides transaction management utilities for database operations.
Package bussql provides transaction management utilities for database operations.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL