Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Consumer ¶
type Consumer interface {
// Consume retrieves the next available message or an error if the consumer is closed.
// The returned Message must be acknowledged or rejected by the caller.
Consume() (Message, error)
// Close stops message consumption and releases any resources.
// After Close, subsequent calls to Consume should return an error.
Close() error
}
Consumer defines the interface for consuming messages from a broker. Each implementation should manage its own connection and message stream.
type ConsumerCloseError ¶
type ConsumerCloseError struct {
}
func (ConsumerCloseError) Error ¶
func (ConsumerCloseError) Error() string
type EmptyRoutError ¶
type EmptyRoutError struct {
}
func (EmptyRoutError) Error ¶
func (EmptyRoutError) Error() string
type Instance ¶
type Instance struct {
// contains filtered or unexported fields
}
Instance is a running listener created from Listener.
- workChan: buffered channel through which the dispatcher feeds anonymous handler functions to the workers.
- wg: WaitGroup for graceful shutdown synchronization.
- gos: fixed worker pool size determined at Init() time.
- router: map routingKey → handler function.
- consumer: same consumer object shared with the parent Listener.
func (*Instance) ListenAndServe ¶
ListenAndServe starts the worker pool and enters an infinite loop that consumes messages from the broker. If consumer.Consume() returns an error, it is propagated to the caller, enabling graceful shutdown at a higher level.
type Listener ¶
type Listener struct {
// contains filtered or unexported fields
}
Listener encapsulates common parameters of a message‑queue subscriber.
- consumer: an object that implements the broker.Consumer interface.
- gos: desired number of concurrent goroutines used by an Instance.
Listener itself does not process messages; it acts as a factory that creates an Instance where the real work happens.
func NewListener ¶
NewListener constructs a Listener with a default parallelism level of 1.
func (*Listener) Init ¶
Init takes a Router snapshot and returns a ready‑to‑run Instance. The workChan capacity is set to 1; workers drain it quickly, so a larger buffer is rarely needed. To start with another router, create a new Instance instead of mutating the old one.
func (*Listener) SetConcurrency ¶
SetConcurrency sets the number of goroutines that will be spawned later inside an Instance. It validates the input (n >= 1) and clamps the value by runtime.GOMAXPROCS(0).
func (*Listener) SetLogger ¶
func (l *Listener) SetLogger(logger LoggerFunc)
SetLogger overrides the default stdlib logger. Pass nil to restore logging to the standard library’s log.Print.
type LoggerFunc ¶
type LoggerFunc func(error)
LoggerFunc is a pluggable callback for error reporting. Users can inject any logger (zap, logrus, zerolog, etc.) by calling listener.SetLogger(customLogger).
type Message ¶
type Message interface {
// Headers returns the message metadata headers.
Headers() map[string]interface{}
// ContentType returns the MIME type of the message payload.
ContentType() string
// IsRedelivered signals if this delivery is a redelivery of a previous message.
IsRedelivered() bool
// Body returns the raw payload bytes.
Body() []byte
// RoutingKey returns the message routing key set on the AMQP delivery.
RoutingKey() string
// Ack acknowledges successful processing of the message.
// It signals the broker to remove the message from the queue.
Ack() error
// Nack negatively acknowledges the message, requeuing it.
// It signals a processing failure.
Nack() error
// Reject rejects the message without multiple-nack support.
// It doesn't requeue the message.
Reject() error
}
Message represents a single broker-delivered message, allowing inspection and acknowledgment. Implementations wrap the broker-specific delivery type.
type Publisher ¶
type Publisher interface {
// Publish sends a message payload in the given context.
// It returns an error if the message could not be delivered.
Publish(context.Context, []byte) error
// Close releases any resources held by the publisher, such as channels or connections.
// After Close, further calls to Publish should return an error.
Close() error
}
Publisher defines the interface for publishing messages to a broker. Implementations should send the payload and handle any connection lifecycle.
type UnroutedMessage ¶
type UnroutedMessage struct {
}
func (UnroutedMessage) Error ¶
func (UnroutedMessage) Error() string