Documentation
¶
Overview ¶
Package pubsub provides a high-performance, thread-safe publish/subscribe messaging system. It supports topic-based message distribution, message history, type-safe filtering, and reliable delivery mechanisms.
Index ¶
- func ApplyChan[A any, B any](s <-chan A, f func(A) (B, error)) <-chan B
- func CastChan[T any](s <-chan any) <-chan T
- func FilterChan[T any](s <-chan any, f func(T) bool) <-chan T
- func Merge[B any](channels ...<-chan any) <-chan B
- func Subscribe[T any](t Topic) <-chan T
- func SubscribeWithFilter[T any](t Topic, f func(T) bool) <-chan T
- type DropMetrics
- type EventTuple
- type Feeder
- type FeedingFunc
- type NoopObserver
- type Observer
- type Opt
- type PubSub
- type PublishResult
- type Topic
- type TopicOpt
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ApplyChan ¶ added in v0.0.7
ApplyChan creates a new channel by applying a transformation function to each message from the input channel. If the transformation function returns an error, the message is dropped. The output channel has the same capacity as the source channel.
Example:
rawCh := topic.Subscribe()
stringCh := ApplyChan(rawCh, func(msg any) (string, error) {
if s, ok := msg.(string); ok {
return strings.ToUpper(s), nil
}
return "", errors.New("not a string")
})
func CastChan ¶ added in v0.0.4
CastChan creates a new channel that converts messages from an any channel to type T. It internally uses FilterChan with an always-true filter, effectively performing only type conversion. Messages that cannot be type-asserted to T are silently dropped.
Example:
// Convert a generic channel to a type-specific channel
rawCh := topic.Subscribe()
userCh := CastChan[User](rawCh)
// Process only User messages
for user := range userCh {
fmt.Printf("User: %s (ID: %d)\n", user.Name, user.ID)
}
func FilterChan ¶
FilterChan returns a channel that will receive messages from the input channel that pass a filter function. It performs both type assertion and custom filtering in a single operation. Messages that don't match the type T or don't pass the filter function are silently dropped.
Example:
// Filter for only high-priority messages
rawCh := topic.Subscribe()
highPriorityMsgs := FilterChan(rawCh, func(msg AlertMessage) bool {
return msg.Priority > 7
})
for alert := range highPriorityMsgs {
handleHighPriorityAlert(alert)
}
func Merge ¶ added in v0.0.4
Merge combines multiple input channels into a single output channel of type B. It uses reflection to handle channels of any type, allowing for flexible merging. The output channel's buffer capacity is the sum of all input channel capacities, or the number of channels if they are unbuffered. When any of the input channels is closed, it is removed from the merge set, but the merged channel remains open until all input channels are closed.
Example:
// Merge channels from different topics
ch1 := topic1.Subscribe()
ch2 := topic2.Subscribe()
mergedCh := Merge[UserEvent](ch1, ch2)
// Process events from both channels
for event := range mergedCh {
fmt.Printf("User %s performed action: %s\n", event.Username, event.Action)
}
func Subscribe ¶
Subscribe returns a channel that will receive messages published to the topic that have a particular type. It creates a new buffered channel with the same capacity as the source channel and automatically filters messages based on type T. Messages that cannot be type-asserted to T are silently dropped. Uses a non-blocking send to prevent deadlocks if the output channel's buffer is full.
Example:
// Subscribe to only receive UserEvent messages
userEvents := pubsub.Subscribe[UserEvent](topic)
for event := range userEvents {
fmt.Printf("User %s performed action: %s\n", event.Username, event.Action)
}
func SubscribeWithFilter ¶
SubscribeWithFilter returns a channel that will receive messages published to the topic that have a particular type and pass a filter function. This combines type filtering with custom filtering logic in a single operation.
Example:
// Subscribe to only receive successful login events
loginEvents := pubsub.SubscribeWithFilter(topic, func(event LoginEvent) bool {
return event.Success == true
})
for event := range loginEvents {
fmt.Printf("Successful login: %s at %v\n", event.Username, event.Timestamp)
}
Types ¶
type DropMetrics ¶ added in v0.0.8
type DropMetrics struct {
// DroppedCount is the total number of messages dropped due to full buffers.
DroppedCount uint64
// FilteredCount is the total number of messages filtered out (not matching criteria).
FilteredCount uint64
}
DropMetrics tracks statistics about dropped messages in channel operations.
func ApplyChanWithMetrics ¶ added in v0.0.8
func ApplyChanWithMetrics[A any, B any](s <-chan A, f func(A) (B, error), metrics *DropMetrics) (<-chan B, *DropMetrics)
ApplyChanWithMetrics is like ApplyChan but also tracks dropped message statistics. If metrics is non-nil, it will be updated with counts of filtered and dropped messages. Returns both the output channel and the metrics object for observation.
Example:
metrics := &pubsub.DropMetrics{}
stringCh, _ := ApplyChanWithMetrics(rawCh, transformFunc, metrics)
// Later: check metrics.GetDropped() for buffer overflow count
func (*DropMetrics) GetDropped ¶ added in v0.0.8
func (m *DropMetrics) GetDropped() uint64
GetDropped returns the current dropped count.
func (*DropMetrics) GetFiltered ¶ added in v0.0.8
func (m *DropMetrics) GetFiltered() uint64
GetFiltered returns the current filtered count.
func (*DropMetrics) IncrementDropped ¶ added in v0.0.8
func (m *DropMetrics) IncrementDropped()
IncrementDropped atomically increments the dropped count.
func (*DropMetrics) IncrementFiltered ¶ added in v0.0.8
func (m *DropMetrics) IncrementFiltered()
IncrementFiltered atomically increments the filtered count.
type EventTuple ¶ added in v0.0.7
type EventTuple struct {
// Topic is the name of the topic the message belongs to.
Topic string
// Event is the actual message content.
Event any
}
EventTuple represents a message and its associated topic, used by Feeders.
type Feeder ¶ added in v0.0.7
type Feeder interface {
// Feed returns a channel that emits EventTuple instances to be published.
Feed() <-chan *EventTuple
}
Feeder is an interface for components that provide a stream of messages to PubSub.
type FeedingFunc ¶ added in v0.0.7
type FeedingFunc func() <-chan *EventTuple
FeedingFunc is a function type that acts as a Feeder.
type NoopObserver ¶ added in v0.0.8
type NoopObserver struct{}
NoopObserver is an Observer that does nothing.
func (NoopObserver) OnDrop ¶ added in v0.0.8
func (o NoopObserver) OnDrop(topic string, msg any)
func (NoopObserver) OnPublish ¶ added in v0.0.8
func (o NoopObserver) OnPublish(topic string, msg any)
func (NoopObserver) OnSubscribe ¶ added in v0.0.8
func (o NoopObserver) OnSubscribe(topic string)
func (NoopObserver) OnUnsubscribe ¶ added in v0.0.8
func (o NoopObserver) OnUnsubscribe(topic string)
type Observer ¶ added in v0.0.8
type Observer interface {
// OnPublish is called when a message is published to a topic.
OnPublish(topic string, msg any)
// OnDrop is called when a message is dropped due to a full buffer.
OnDrop(topic string, msg any)
// OnSubscribe is called when a new subscription is created for a topic.
OnSubscribe(topic string)
// OnUnsubscribe is called when a subscription is removed from a topic.
OnUnsubscribe(topic string)
}
Observer is an interface for components that want to observe events in the PubSub system.
type Opt ¶
type Opt func(*pubSub)
Opt is a functional option for configuring a PubSub instance.
func WithHistorySize ¶
WithHistorySize enables message history for all topics created by the PubSub instance. It sets the maximum number of historical messages to store per topic.
func WithLockFreeHistory ¶ added in v0.0.8
WithLockFreeHistory is an option for NewPubSub/NewShardedPubSub that enables lock-free message history for all topics.
func WithObserver ¶ added in v0.0.8
WithObserver sets the observer for the PubSub instance.
type PubSub ¶
type PubSub interface {
// Publish sends messages to a specific topic. All subscribers to the topic
// and subscribers to the "*" topic will receive these messages.
Publish(topic string, msg ...any) PublishResult
// AddFeeder registers a Feeder that will provide messages to the PubSub system.
// The Feeder should implement the Feed method, which returns a channel that emits
// EventTuple instances. Each EventTuple contains the topic and the event message.
// This allows for dynamic message feeding into the PubSub system.
// The context can be used to cancel the feeding process.
AddFeeder(ctx context.Context, f Feeder)
// AddFeedingFunc registers a FeedingFunc that will provide messages to the PubSub system.
// The FeedingFunc should return a channel that emits EventTuple instances.
// Each EventTuple contains the topic and the event message.
// This allows for dynamic message feeding into the PubSub system.
// The context can be used to cancel the feeding process.
AddFeedingFunc(ctx context.Context, f FeedingFunc)
// Topic returns a Topic instance for the given topic name.
// If the topic doesn't exist, it creates a new one.
Topic(topic string) Topic
// Topics returns a list of all currently registered topics.
Topics() []string
// Subscribe creates a subscription to a specific topic and returns a channel
// that will receive all messages published to that topic.
// If history is enabled, new subscribers will receive historical messages.
Subscribe(topic string) <-chan any
// SubscribeFunc registers a callback function that will be invoked for
// each message published to the specified topic.
// If history is enabled, the callback will be invoked for historical messages.
SubscribeFunc(topic string, f func(msg any))
// Unsubscribe removes a subscription channel from a specific topic.
// After unsubscribing, the channel will be closed.
Unsubscribe(topic string, sub <-chan any)
// SubscribeAll creates a subscription to all topics by subscribing to
// the special "*" topic and returns a channel for receiving messages.
// If history is enabled, new subscribers will receive historical messages from all topics.
SubscribeAll() <-chan any
// SubscribeAllFunc registers a callback function that will be invoked
// for every message published to any topic.
// If history is enabled, the callback will be invoked for historical messages from all topics.
SubscribeAllFunc(f func(msg any))
// UnsubscribeAll removes a subscription channel from the special "*" topic,
// effectively unsubscribing from all messages.
// After unsubscribing, the channel will be closed.
UnsubscribeAll(sub <-chan any)
// SubscribeWithBuffer creates a subscription to a specific topic with a custom buffer size.
SubscribeWithBuffer(topic string, size int) <-chan any
// DeleteTopic removes a topic and closes all its subscriptions.
DeleteTopic(name string)
// PublishReliable sends messages to a specific topic using blocking sends with a timeout.
// This ensures messages are delivered even to unbuffered channels, but may block briefly.
// Returns the total number of deliveries across the topic and the "*" topic.
PublishReliable(topic string, msg ...any) int
// AddFeederReliable registers a Feeder that will provide messages to the PubSub system using reliable delivery.
// The context can be used to cancel the feeding process.
AddFeederReliable(ctx context.Context, f Feeder)
// AddFeedingFuncReliable registers a FeedingFunc that will provide messages to the PubSub system using reliable delivery.
// The context can be used to cancel the feeding process.
AddFeedingFuncReliable(ctx context.Context, f FeedingFunc)
// SubscribeUnbuffered creates a subscription to a specific topic and returns an unbuffered channel.
// This should be used with PublishReliable for guaranteed message delivery.
SubscribeUnbuffered(topic string) <-chan any
// SubscribeAllUnbuffered creates a subscription to all topics and returns an unbuffered channel.
SubscribeAllUnbuffered() <-chan any
// Shutdown gracefully shuts down the PubSub instance, closing all topics and subscriptions.
// The context can be used to set a timeout for the shutdown process.
Shutdown(ctx context.Context)
// Close is an alias for Shutdown with a background context.
Close()
}
PubSub is a thread-safe publish/subscribe message broker interface that enables topic-based message distribution. It supports both direct channel subscriptions and callback-based message handling. The interface provides message history capabilities when enabled, and supports a special "*" topic that receives all messages from all topics.
There are two main implementations of the PubSub interface:
- Default PubSub (NewPubSub): Appropriate for most use cases with a moderate number of topics and message rates.
- Sharded PubSub (NewShardedPubSub): Optimized for high-concurrency scenarios with many topics, reducing lock contention by sharding topics across multiple internal maps.
Basic usage:
// Create a new PubSub instance
ps := pubsub.NewPubSub()
// Subscribe to a topic
ch := ps.Subscribe("notifications")
// Process messages in a goroutine
go func() {
for msg := range ch {
fmt.Printf("Received: %v\n", msg)
}
}()
// Publish messages to the topic
ps.Publish("notifications", "Hello, World!")
// With history:
ps := pubsub.NewPubSub(pubsub.WithHistorySize(10))
func NewPubSub ¶
NewPubSub creates a new default PubSub instance. This implementation uses a single map and mutex for topic management, which is efficient for most applications with a standard number of topics. Optionally pass WithHistorySize to enable history for all topics and set the size of the history buffer.
func NewShardedPubSub ¶ added in v0.0.8
NewShardedPubSub creates a new sharded PubSub instance optimized for high-concurrency scenarios. It distributes topics across 16 shards to minimize lock contention by reducing the frequency of global lock acquisition. This is particularly beneficial when many topics are being created or accessed simultaneously by different goroutines.
type PublishResult ¶ added in v0.0.8
type PublishResult struct {
// Deliveries is the number of successful deliveries.
Deliveries int
// Drops is the number of messages dropped due to full buffers.
Drops int
}
PublishResult contains information about the result of a Publish operation.
type Topic ¶
type Topic interface {
// Publish sends one or more messages to all subscribers of the topic.
// Uses a non-blocking send to prevent deadlocks if a subscriber is not reading.
// Returns a PublishResult containing the number of successful deliveries and drops.
Publish(msg ...any) PublishResult
// Subscribe returns a channel that will receive messages published to the topic.
// If the topic has history enabled, the channel will receive historical messages.
Subscribe() <-chan any
// SubscribeFunc subscribes to the topic and calls the provided function for each message.
// Returns the subscription channel that can be used to unsubscribe later.
// If the topic has history enabled, the function will be called for historical messages.
SubscribeFunc(f func(msg any)) <-chan any
// SubscribeWithBuffer returns a channel with a custom buffer size that will receive messages.
// Larger buffer sizes can help prevent message loss when subscribers can't keep up.
SubscribeWithBuffer(size int) <-chan any
// Unsubscribe removes a channel from the list of subscribers and closes the channel.
// After unsubscribing, the channel will be closed.
Unsubscribe(ch <-chan any)
// PublishReliable publishes a message to the topic using blocking sends with timeout.
// This ensures messages are delivered even to unbuffered channels, but may block briefly.
// Returns the number of subscribers that successfully received the message.
PublishReliable(msg ...any) int
// SubscribeUnbuffered returns an unbuffered channel that will receive messages.
// Should be used with PublishReliable for guaranteed message delivery.
SubscribeUnbuffered() <-chan any
// Shutdown closes all subscriptions with context for timeout support.
Shutdown(ctx context.Context)
// Close shuts down the topic and closes all subscription channels.
Close()
}
Topic is a simple, single topic, publish/subscribe interface. It provides methods to publish messages, subscribe to messages, and unsubscribe from the topic. Messages can be of any type.
There are several implementations of the Topic interface:
- Basic Topic (NewTopic): A simple topic with no history.
- Topic with History (newTopicWithHistory): Maintains a history of messages using a mutex-protected slice. Good for most use cases where history is needed.
- Topic with Lock-Free History (newTopicWithLockFreeHistory): Maintains history using a lock-free ring buffer. Recommended for high-concurrency environments to minimize contention on the publish path.
Basic usage:
// Create a new Topic
topic := pubsub.NewTopic()
// Subscribe to the topic
ch := topic.Subscribe()
// Process messages in a goroutine
go func() {
for msg := range ch {
fmt.Printf("Received: %v\n", msg)
}
}()
// Publish messages to the topic
topic.Publish("Hello, World!")
// With history:
topicWithHistory := pubsub.NewTopic(pubsub.WithHistory(10))
type TopicOpt ¶ added in v0.0.8
type TopicOpt func(*topicConfig)
TopicOpt is a functional option for configuring a Topic instance.
func WithHistory ¶ added in v0.0.8
WithHistory enables message history for the Topic.
func WithLockFreeHistoryOpt ¶ added in v0.0.8
WithLockFreeHistoryOpt enables lock-free message history for the Topic.
func WithTopicObserver ¶ added in v0.0.8
WithTopicObserver sets the observer for the Topic instance.
Directories
¶
| Path | Synopsis |
|---|---|
|
examples
|
|
|
basic
command
|
|
|
feeder
command
|
|
|
filtering
command
|
|
|
history
command
|
|
|
merging
command
|
|
|
otel-observer
command
|
|
|
internal
|
|
|
topicstore
Package topicstore provides topic storage implementations for the pubsub package.
|
Package topicstore provides topic storage implementations for the pubsub package. |
|
Package otelpubsub provides an OpenTelemetry and Prometheus observer for the pubsub package.
|
Package otelpubsub provides an OpenTelemetry and Prometheus observer for the pubsub package. |
|
pkg
|
|
|
ringbuffer
Package ringbuffer provides a lock-free ring buffer implementation for storing message history with concurrent read/write support.
|
Package ringbuffer provides a lock-free ring buffer implementation for storing message history with concurrent read/write support. |