Documentation
¶
Index ¶
- func GetTrackedValue(ctx context.Context, key any) any
- func TrackOptions(ctx context.Context) context.Context
- func WarnUnconsumed(ctx context.Context, logger Logger)
- func WithTrackedValue(ctx context.Context, key, val any, name string) context.Context
- type Broker
- type Event
- type Handler
- type JsonMarshaler
- type Logger
- type Marshaler
- type Message
- type Option
- func Addrs(addrs ...string) Option
- func ClientID(id string) Option
- func Codec(c Marshaler) Option
- func ErrorHandler(h Handler) Option
- func Meter(m metric.Meter) Option
- func Secure(b bool) Option
- func TLSConfig(t *tls.Config) Option
- func Tracer(t trace.Tracer) Option
- func WithClientID(id string) Option
- func WithContext(ctx context.Context) Option
- func WithLogger(l Logger) Option
- type OptionTracker
- type Options
- type PublishOption
- type PublishOptions
- type SubscribeOption
- type SubscribeOptions
- type Subscriber
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func GetTrackedValue ¶
GetTrackedValue retrieves a value from the context and marks it as consumed.
func TrackOptions ¶ added in v0.3.4
TrackOptions initializes an OptionTracker in the context.
func WarnUnconsumed ¶
WarnUnconsumed logs a warning if any registered options were not consumed.
Types ¶
type Broker ¶
type Broker interface {
// Init initializes the broker with options.
// It should only validate configuration and not establish network connections.
Init(...Option) error
// Options returns the broker options.
Options() Options
// Address returns the broker address.
Address() string
// Connect connects the broker to the message service.
// All network initialization and client creation should happen here.
Connect() error
// Disconnect disconnects the broker from the message service.
Disconnect() error
// Publish publishes a message to a topic.
Publish(ctx context.Context, topic string, msg *Message, opts ...PublishOption) error
// Subscribe subscribes to a topic with a handler.
Subscribe(topic string, h Handler, opts ...SubscribeOption) (Subscriber, error)
// String returns the broker implementation name.
String() string
}
Broker is an interface used for asynchronous messaging. It provides a unified API to interact with different message brokers.
func NewNoopBroker ¶
type Event ¶
type Event interface {
// Topic returns the topic name.
Topic() string
// Message returns the received message.
Message() *Message
// Ack acknowledges the message.
Ack() error
// Nack negatively acknowledges the message.
// If requeue is true, the message will be returned to the queue if supported.
Nack(requeue bool) error
// Error returns any error occurred during processing.
Error() error
}
Event is given to a subscription handler for processing. It contains the message and metadata about the topic.
type JsonMarshaler ¶
type JsonMarshaler struct{}
func (JsonMarshaler) String ¶
func (j JsonMarshaler) String() string
type Marshaler ¶
type Marshaler interface {
Marshal(interface{}) ([]byte, error)
Unmarshal([]byte, interface{}) error
String() string
}
Marshaler is a simple encoding interface.
type Message ¶
type Message struct {
// Header contains message metadata.
Header map[string]string
// Body contains the message payload.
Body []byte
// Partition is the partition ID for brokers that support it (like Kafka).
Partition int32
}
Message is a message send/received from the broker.
type Option ¶
type Option func(*Options)
func Codec ¶
Codec sets the codec used for encoding/decoding used where a broker does not support headers.
func ErrorHandler ¶
ErrorHandler will catch all broker errors that cant be handled in normal way, for example Codec errors.
func WithClientID ¶ added in v0.3.4
WithClientID sets the client identifier.
func WithContext ¶ added in v0.3.4
WithContext sets the context for the broker.
type OptionTracker ¶
type OptionTracker struct {
// contains filtered or unexported fields
}
OptionTracker tracks which options have been registered and consumed.
type Options ¶
type Options struct {
// Addrs is a list of broker addresses.
Addrs []string
// Secure specifies whether to use a secure connection.
Secure bool
// Codec is the marshaler used for encoding/decoding messages.
Codec Marshaler
// ErrorHandler is called when an error occurs during message handling.
ErrorHandler Handler
// TLSConfig is the TLS configuration for secure connections.
TLSConfig *tls.Config
// Tracer is the OpenTelemetry tracer for observability.
Tracer trace.Tracer
// Meter is the OpenTelemetry meter for observability.
Meter metric.Meter
// Context is the underlying context for custom options.
Context context.Context
// Logger for debug/info logging.
Logger Logger
// ClientID is a unique identifier for the client.
ClientID string
}
Options contains the broker configuration.
func NewOptions ¶
type PublishOption ¶
type PublishOption func(*PublishOptions)
func PublishContext ¶
func PublishContext(ctx context.Context) PublishOption
PublishContext set context.
func WithDelay ¶
func WithDelay(d time.Duration) PublishOption
WithDelay sets the delay duration for a publish operation.
func WithShardingKey ¶
func WithShardingKey(v string) PublishOption
func WithTags ¶
func WithTags(tags ...string) PublishOption
type PublishOptions ¶
type PublishOptions struct {
// Context is the context for the publish operation.
Context context.Context
// ShardingKey is the key used for sharding/partitioning.
ShardingKey string
// Delay is the delay duration for the message.
Delay time.Duration
// Tags are labels for the message (e.g. for filtering).
Tags []string
}
PublishOptions contains options for publishing a message.
func NewPublishOptions ¶ added in v0.3.4
func NewPublishOptions(opts ...PublishOption) PublishOptions
type SubscribeOption ¶
type SubscribeOption func(*SubscribeOptions)
func DisableAutoAck ¶
func DisableAutoAck() SubscribeOption
DisableAutoAck will disable auto acking of messages after they have been handled.
func Queue ¶
func Queue(name string) SubscribeOption
Queue sets the name of the queue to share messages on.
func SubscribeContext ¶
func SubscribeContext(ctx context.Context) SubscribeOption
SubscribeContext set context.
func WithAutoAck ¶ added in v0.3.4
func WithAutoAck(ack bool) SubscribeOption
WithAutoAck sets the auto acknowledgement for the subscription.
func WithDeadLetterQueue ¶
func WithDeadLetterQueue(v string) SubscribeOption
func WithQueue ¶
func WithQueue(name string) SubscribeOption
WithQueue sets the name of the queue or consumer group.
type SubscribeOptions ¶
type SubscribeOptions struct {
// AutoAck specifies whether to automatically acknowledge messages.
AutoAck bool
// Queue is the consumer group name or queue name.
Queue string
// DeadLetterQueue is the name of the dead letter queue.
DeadLetterQueue string
// Context is the context for the subscribe operation.
Context context.Context
}
SubscribeOptions contains options for subscribing to a topic.
func NewSubscribeOptions ¶
func NewSubscribeOptions(opts ...SubscribeOption) SubscribeOptions
type Subscriber ¶
type Subscriber interface {
// Options returns the subscription options.
Options() SubscribeOptions
// Topic returns the subscribed topic.
Topic() string
// Unsubscribe stops the subscription.
Unsubscribe() error
}
Subscriber is a convenience return type for the Subscribe method. It allows managing the subscription lifecycle.