Documentation
¶
Index ¶
- type AutoReconnector
- func (a *AutoReconnector) GetLastReconnectTime() time.Time
- func (a *AutoReconnector) GetReconnectCount() int
- func (a *AutoReconnector) IsEnabled() bool
- func (a *AutoReconnector) OnConnect(ctx context.Context, client Client)
- func (a *AutoReconnector) OnDisconnect(ctx context.Context, client Client, err error)
- func (a *AutoReconnector) OnSubscribe(ctx context.Context, client Client, topicPattern string)
- func (a *AutoReconnector) OnUnsubscribe(ctx context.Context, client Client, topicPattern string)
- func (a *AutoReconnector) OnUnsubscribeAll(ctx context.Context, client Client)
- func (a *AutoReconnector) SetEnabled(enabled bool)
- type AutoReconnectorBuilder
- func (b *AutoReconnectorBuilder) Build() *AutoReconnector
- func (b *AutoReconnectorBuilder) WithBackoffFactor(factor float64) *AutoReconnectorBuilder
- func (b *AutoReconnectorBuilder) WithEnabled(enabled bool) *AutoReconnectorBuilder
- func (b *AutoReconnectorBuilder) WithInitialDelay(delay time.Duration) *AutoReconnectorBuilder
- func (b *AutoReconnectorBuilder) WithLogger(logger *zap.Logger) *AutoReconnectorBuilder
- func (b *AutoReconnectorBuilder) WithMaxDelay(delay time.Duration) *AutoReconnectorBuilder
- func (b *AutoReconnectorBuilder) WithMaxRetries(retries int) *AutoReconnectorBuilder
- type BaseSubscriber
- func (b *BaseSubscriber) OnEvent(ctx context.Context, topic string, message any, fields map[string]string) error
- func (b *BaseSubscriber) OnSubscribe(ctx context.Context, topic string) error
- func (b *BaseSubscriber) OnUnsubscribe(ctx context.Context, topic string) error
- func (b *BaseSubscriber) PassThrough(msg EventBusMessage) error
- type Client
- type ClientMonitor
- type EventBus
- type EventBusBuilder
- func (b *EventBusBuilder) Build() (EventBus, error)
- func (b *EventBusBuilder) IsValid() error
- func (b *EventBusBuilder) WithBufferSize(size int) *EventBusBuilder
- func (b *EventBusBuilder) WithLogger(logger *zap.Logger) *EventBusBuilder
- func (b *EventBusBuilder) WithMeterProvider(provider metric.MeterProvider) *EventBusBuilder
- func (b *EventBusBuilder) WithName(name string) *EventBusBuilder
- func (b *EventBusBuilder) WithServiceInfo(name, version string) *EventBusBuilder
- func (b *EventBusBuilder) WithTracerProvider(provider trace.TracerProvider) *EventBusBuilder
- type EventBusMessage
- type EventReceiver
- type MessageType
- type Subscriber
- type SubscriptionTracker
- func (s *SubscriptionTracker) GetConnectionTime() time.Time
- func (s *SubscriptionTracker) GetDisconnectionTime() time.Time
- func (s *SubscriptionTracker) GetLastError() error
- func (s *SubscriptionTracker) GetSubscriptionCount() int
- func (s *SubscriptionTracker) GetSubscriptionTime(topicPattern string) time.Time
- func (s *SubscriptionTracker) GetSubscriptionTopics() []string
- func (s *SubscriptionTracker) GetSubscriptions() map[string]time.Time
- func (s *SubscriptionTracker) IsConnected() bool
- func (s *SubscriptionTracker) IsSubscribedTo(topicPattern string) bool
- func (s *SubscriptionTracker) OnConnect(ctx context.Context, client Client)
- func (s *SubscriptionTracker) OnDisconnect(ctx context.Context, client Client, err error)
- func (s *SubscriptionTracker) OnSubscribe(ctx context.Context, client Client, topicPattern string)
- func (s *SubscriptionTracker) OnUnsubscribe(ctx context.Context, client Client, topicPattern string)
- func (s *SubscriptionTracker) OnUnsubscribeAll(ctx context.Context, client Client)
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AutoReconnector ¶
type AutoReconnector struct {
*SubscriptionTracker
// contains filtered or unexported fields
}
AutoReconnector is a ClientMonitor that automatically reconnects on error disconnects with exponential backoff and resubscribes to previous topics.
Features:
- Automatic reconnection on error disconnects (but not clean disconnects)
- Exponential backoff with configurable parameters
- Automatic resubscription to previous topic patterns after reconnect
- Configurable maximum retry attempts
- Thread-safe operation
Example usage:
// Default configuration (with no-op logger)
reconnector := bus.NewAutoReconnector().Build()
// Custom configuration with logger
logger, _ := zap.NewProduction()
reconnector := bus.NewAutoReconnector().
WithInitialDelay(500 * time.Millisecond).
WithMaxDelay(10 * time.Second).
WithBackoffFactor(1.5).
WithMaxRetries(5).
WithEnabled(true).
WithLogger(logger).
Build()
client, err := client.NewClient().
WithURL("ws://localhost:8080/ws").
WithSubscriber(subscriber).
WithMonitor(reconnector).
Build()
// The client will now automatically reconnect on connection failures
// and restore all previous subscriptions
func (*AutoReconnector) GetLastReconnectTime ¶
func (a *AutoReconnector) GetLastReconnectTime() time.Time
GetLastReconnectTime returns when the last reconnection attempt was made.
func (*AutoReconnector) GetReconnectCount ¶
func (a *AutoReconnector) GetReconnectCount() int
GetReconnectCount returns the number of reconnection attempts made.
func (*AutoReconnector) IsEnabled ¶
func (a *AutoReconnector) IsEnabled() bool
IsEnabled returns whether automatic reconnection is currently enabled.
func (*AutoReconnector) OnConnect ¶
func (a *AutoReconnector) OnConnect(ctx context.Context, client Client)
OnConnect implements ClientMonitor.OnConnect and delegates to SubscriptionTracker.
func (*AutoReconnector) OnDisconnect ¶
func (a *AutoReconnector) OnDisconnect(ctx context.Context, client Client, err error)
OnDisconnect implements ClientMonitor.OnDisconnect and triggers reconnection on error disconnects.
func (*AutoReconnector) OnSubscribe ¶
func (a *AutoReconnector) OnSubscribe(ctx context.Context, client Client, topicPattern string)
OnSubscribe implements ClientMonitor.OnSubscribe and delegates to SubscriptionTracker.
func (*AutoReconnector) OnUnsubscribe ¶
func (a *AutoReconnector) OnUnsubscribe(ctx context.Context, client Client, topicPattern string)
OnUnsubscribe implements ClientMonitor.OnUnsubscribe and delegates to SubscriptionTracker.
func (*AutoReconnector) OnUnsubscribeAll ¶
func (a *AutoReconnector) OnUnsubscribeAll(ctx context.Context, client Client)
OnUnsubscribeAll implements ClientMonitor.OnUnsubscribeAll and delegates to SubscriptionTracker.
func (*AutoReconnector) SetEnabled ¶
func (a *AutoReconnector) SetEnabled(enabled bool)
SetEnabled enables or disables automatic reconnection.
type AutoReconnectorBuilder ¶
type AutoReconnectorBuilder struct {
// contains filtered or unexported fields
}
AutoReconnectorBuilder provides a fluent interface for configuring AutoReconnector.
func NewAutoReconnector ¶
func NewAutoReconnector() *AutoReconnectorBuilder
NewAutoReconnector creates a new AutoReconnectorBuilder with default configuration.
func (*AutoReconnectorBuilder) Build ¶
func (b *AutoReconnectorBuilder) Build() *AutoReconnector
Build creates the configured AutoReconnector.
func (*AutoReconnectorBuilder) WithBackoffFactor ¶
func (b *AutoReconnectorBuilder) WithBackoffFactor(factor float64) *AutoReconnectorBuilder
WithBackoffFactor sets the multiplier for exponential backoff.
func (*AutoReconnectorBuilder) WithEnabled ¶
func (b *AutoReconnectorBuilder) WithEnabled(enabled bool) *AutoReconnectorBuilder
WithEnabled sets whether automatic reconnection is initially enabled.
func (*AutoReconnectorBuilder) WithInitialDelay ¶
func (b *AutoReconnectorBuilder) WithInitialDelay(delay time.Duration) *AutoReconnectorBuilder
WithInitialDelay sets the initial delay before the first reconnection attempt.
func (*AutoReconnectorBuilder) WithLogger ¶
func (b *AutoReconnectorBuilder) WithLogger(logger *zap.Logger) *AutoReconnectorBuilder
WithLogger sets the logger for reconnection events.
func (*AutoReconnectorBuilder) WithMaxDelay ¶
func (b *AutoReconnectorBuilder) WithMaxDelay(delay time.Duration) *AutoReconnectorBuilder
WithMaxDelay sets the maximum delay between reconnection attempts.
func (*AutoReconnectorBuilder) WithMaxRetries ¶
func (b *AutoReconnectorBuilder) WithMaxRetries(retries int) *AutoReconnectorBuilder
WithMaxRetries sets the maximum number of reconnection attempts. Set to -1 for unlimited retries.
type BaseSubscriber ¶
type BaseSubscriber struct {
}
func (*BaseSubscriber) OnSubscribe ¶
func (b *BaseSubscriber) OnSubscribe(ctx context.Context, topic string) error
func (*BaseSubscriber) OnUnsubscribe ¶
func (b *BaseSubscriber) OnUnsubscribe(ctx context.Context, topic string) error
func (*BaseSubscriber) PassThrough ¶
func (b *BaseSubscriber) PassThrough(msg EventBusMessage) error
type Client ¶
type Client interface {
Subscriber
Connect(ctx context.Context) error
Disconnect() error
Subscribe(ctx context.Context, topicPattern string) error
Unsubscribe(ctx context.Context, topicPattern string) error
UnsubscribeAll(ctx context.Context) error
Publish(ctx context.Context, topic string, payload any) error
PublishSync(ctx context.Context, topic string, payload any) error
}
type ClientMonitor ¶
type ClientMonitor interface {
OnConnect(ctx context.Context, client Client)
OnDisconnect(ctx context.Context, client Client, err error)
OnSubscribe(ctx context.Context, client Client, topic string)
OnUnsubscribe(ctx context.Context, client Client, topic string)
OnUnsubscribeAll(ctx context.Context, client Client)
}
type EventBus ¶
type EventBus interface {
Subscriber // An EventBus can be subscribed to other event buses
Start() error
Stop() error
Subscribe(ctx context.Context, topic string, subscriber Subscriber) error
SubscribeFunc(ctx context.Context, topic string, receiver EventReceiver) (Subscriber, error)
Unsubscribe(ctx context.Context, topic string, subscriber Subscriber) error
UnsubscribeAll(ctx context.Context, subscriber Subscriber) error
Publish(ctx context.Context, topic string, payload any) error
PublishSync(ctx context.Context, topic string, payload any) error
}
type EventBusBuilder ¶
type EventBusBuilder struct {
// contains filtered or unexported fields
}
EventBusBuilder provides a fluent interface for creating EventBus instances
func (*EventBusBuilder) Build ¶
func (b *EventBusBuilder) Build() (EventBus, error)
Build creates and returns the EventBus instance, returning an error if configuration is invalid
func (*EventBusBuilder) IsValid ¶
func (b *EventBusBuilder) IsValid() error
IsValid validates the builder configuration and returns an error if invalid
func (*EventBusBuilder) WithBufferSize ¶
func (b *EventBusBuilder) WithBufferSize(size int) *EventBusBuilder
WithBufferSize sets the channel buffer size for the EventBus
func (*EventBusBuilder) WithLogger ¶
func (b *EventBusBuilder) WithLogger(logger *zap.Logger) *EventBusBuilder
WithLogger sets the logger for the EventBus
func (*EventBusBuilder) WithMeterProvider ¶ added in v0.11.0
func (b *EventBusBuilder) WithMeterProvider(provider metric.MeterProvider) *EventBusBuilder
WithMeterProvider sets the OTel MeterProvider for the EventBus
func (*EventBusBuilder) WithName ¶
func (b *EventBusBuilder) WithName(name string) *EventBusBuilder
WithName sets the name for the EventBus
func (*EventBusBuilder) WithServiceInfo ¶
func (b *EventBusBuilder) WithServiceInfo(name, version string) *EventBusBuilder
WithServiceInfo sets service name and version for observability
func (*EventBusBuilder) WithTracerProvider ¶ added in v0.10.0
func (b *EventBusBuilder) WithTracerProvider(provider trace.TracerProvider) *EventBusBuilder
WithTracerProvider sets the OTel TracerProvider for the EventBus
type EventBusMessage ¶
type EventBusMessage struct {
Ctx context.Context
MsgType MessageType
Topic string
Payload any
Fields map[string]string
}
EventBusMessage represents a message in the event bus with its context and metadata.
Fields carries subscriber-local delivery metadata (e.g. topic pattern extractions, enrichment added by transforms on the final hop). It is intentionally stripped when a message is published to the bus: these fields do not propagate across hops. The bus publish/subscribe paths construct EventBusMessage values without populating Fields; Fields is set by the delivery path (subscribers, transforms, or the caller of transform.ApplyTransforms) to carry per-delivery context.
type EventReceiver ¶ added in v0.9.0
type MessageType ¶
type MessageType int
MessageType represents the type of message in the event bus
const ( MessageTypeEvent MessageType = iota MessageTypeSubscribe MessageTypeSubscribeWithExtraction MessageTypeUnsubscribe MessageTypeUnsubscribeAll MessageTypeEventSync MessageTypeOnSubscribe MessageTypeOnUnsubscribe MessageTypePassThrough MessageTypeTick )
type Subscriber ¶
type Subscriber interface {
OnSubscribe(ctx context.Context, topic string) error
OnUnsubscribe(ctx context.Context, topic string) error
OnEvent(ctx context.Context, topic string, message any, fields map[string]string) error
// PassThrough is used to pass the message to the next subscriber in the chain, eg to handle
// cases like request responses.
PassThrough(msg EventBusMessage) error
}
Note: Unless a subscriber is suubscribed to multiple busses and/or async queueing wrappers, it will only be called from one thread, so it doesn't need to worry about concurrent calls.
func NewEventReceiver ¶ added in v0.9.0
func NewEventReceiver(receiver EventReceiver) Subscriber
type SubscriptionTracker ¶
type SubscriptionTracker struct {
// contains filtered or unexported fields
}
SubscriptionTracker is a sample ClientMonitor implementation that tracks client connection state and active subscriptions.
Example usage:
tracker := &bus.SubscriptionTracker{}
client, err := client.NewClient().
WithURL("ws://localhost:8080/ws").
WithSubscriber(subscriber).
WithMonitor(tracker).
Build()
// Later, check connection and subscription status
if tracker.IsConnected() {
subscriptions := tracker.GetSubscriptions()
fmt.Printf("Connected with %d active subscriptions: %v\n",
len(subscriptions), subscriptions)
}
func NewSubscriptionTracker ¶
func NewSubscriptionTracker() *SubscriptionTracker
NewSubscriptionTracker creates a new subscription tracker.
func (*SubscriptionTracker) GetConnectionTime ¶
func (s *SubscriptionTracker) GetConnectionTime() time.Time
GetConnectionTime returns when the client last connected. Returns zero time if never connected.
func (*SubscriptionTracker) GetDisconnectionTime ¶
func (s *SubscriptionTracker) GetDisconnectionTime() time.Time
GetDisconnectionTime returns when the client last disconnected. Returns zero time if never disconnected.
func (*SubscriptionTracker) GetLastError ¶
func (s *SubscriptionTracker) GetLastError() error
GetLastError returns the error from the last disconnect, if any. Returns nil for graceful disconnects.
func (*SubscriptionTracker) GetSubscriptionCount ¶
func (s *SubscriptionTracker) GetSubscriptionCount() int
GetSubscriptionCount returns the number of active subscriptions.
func (*SubscriptionTracker) GetSubscriptionTime ¶
func (s *SubscriptionTracker) GetSubscriptionTime(topicPattern string) time.Time
GetSubscriptionTime returns when the client subscribed to the given topic. Returns zero time if not subscribed to the topic.
func (*SubscriptionTracker) GetSubscriptionTopics ¶
func (s *SubscriptionTracker) GetSubscriptionTopics() []string
GetSubscriptionTopics returns a slice of all active subscription topics.
func (*SubscriptionTracker) GetSubscriptions ¶
func (s *SubscriptionTracker) GetSubscriptions() map[string]time.Time
GetSubscriptions returns a copy of all active subscriptions with their subscription times.
func (*SubscriptionTracker) IsConnected ¶
func (s *SubscriptionTracker) IsConnected() bool
IsConnected returns true if the client is currently connected.
func (*SubscriptionTracker) IsSubscribedTo ¶
func (s *SubscriptionTracker) IsSubscribedTo(topicPattern string) bool
IsSubscribedTo returns true if the client is subscribed to the given topic pattern.
func (*SubscriptionTracker) OnConnect ¶
func (s *SubscriptionTracker) OnConnect(ctx context.Context, client Client)
OnConnect implements ClientMonitor.OnConnect
func (*SubscriptionTracker) OnDisconnect ¶
func (s *SubscriptionTracker) OnDisconnect(ctx context.Context, client Client, err error)
OnDisconnect implements ClientMonitor.OnDisconnect
func (*SubscriptionTracker) OnSubscribe ¶
func (s *SubscriptionTracker) OnSubscribe(ctx context.Context, client Client, topicPattern string)
OnSubscribe implements ClientMonitor.OnSubscribe
func (*SubscriptionTracker) OnUnsubscribe ¶
func (s *SubscriptionTracker) OnUnsubscribe(ctx context.Context, client Client, topicPattern string)
OnUnsubscribe implements ClientMonitor.OnUnsubscribe
func (*SubscriptionTracker) OnUnsubscribeAll ¶
func (s *SubscriptionTracker) OnUnsubscribeAll(ctx context.Context, client Client)
OnUnsubscribeAll implements ClientMonitor.OnUnsubscribeAll
Directories
¶
| Path | Synopsis |
|---|---|
|
Package topicmatch wraps github.com/amir-yaghoubi/mqttpattern to enforce the MQTT 5.0 §4.7.2 rule: a topic filter starting with a wildcard character (+ or #) MUST NOT match a topic name beginning with $.
|
Package topicmatch wraps github.com/amir-yaghoubi/mqttpattern to enforce the MQTT 5.0 §4.7.2 rule: a topic filter starting with a wildcard character (+ or #) MUST NOT match a topic name beginning with $. |