Documentation
¶
Overview ¶
Package pubsub implements a generic Publish-Subscribe pattern. It allows publishers to send messages to multiple subscribers based on topic keys, with thread-safe operations. The implementation is generic, supporting any comparable key type and any message type.
Example (ContextAwarePublishing) ¶
package main import ( "context" "fmt" "time" "github.com/mdigger/pubsub" ) func main() { ps := pubsub.New[string, string]() // Create a buffered channel to ensure the example works ch := make(chan string, 2) ps.Subscribe([]string{"alerts"}, ch) // Regular publish with background context delivered, err := ps.Publish(context.Background(), "alerts", "urgent") fmt.Printf("Delivered: %d, Error: %v\n", delivered, err) // Publish with timeout (using buffered channel to ensure delivery) delivered, err = ps.PublishWithTimeout("alerts", "timeout-test", 100*time.Millisecond) fmt.Printf("With timeout - Delivered: %d, Error: %v\n", delivered, err) // Consume the messages to clean up <-ch <-ch }
Output: Delivered: 1, Error: <nil> With timeout - Delivered: 1, Error: <nil>
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type PubSub ¶
type PubSub[K comparable, T any] struct { // contains filtered or unexported fields }
PubSub implements the Publish-Subscribe pattern. It maintains a mapping of keys to subscriber channels, allowing efficient message distribution. K is the key type (must be comparable), T is the message type.
func New ¶
func New[K comparable, T any]() *PubSub[K, T]
New creates and returns a new PubSub instance. The returned PubSub is ready to use with zero values initialized.
func (*PubSub[K, T]) Publish ¶
Publish sends a message to all channels subscribed to the specified key. The operation will block until all subscribers receive the message or until: - The context is canceled - The timeout expires (if context has a deadline) Returns the number of successful deliveries and any context error encountered.
func (*PubSub[K, T]) PublishWithTimeout ¶
PublishWithTimeout is a convenience method that creates a context with timeout.
func (*PubSub[K, T]) Subscribe ¶
func (ps *PubSub[K, T]) Subscribe(keys []K, ch chan T)
Subscribe adds a channel to receive messages for the specified keys. The channel will receive all messages published to any of the provided keys. If the channel is already subscribed to a key, this is a no-op.
Note: The channel should have sufficient buffer space or active readers to prevent indefinite blocking in the Publish method.
func (*PubSub[K, T]) Unsubscribe ¶
func (ps *PubSub[K, T]) Unsubscribe(keys []K, ch chan T)
Unsubscribe removes a channel from receiving messages for the specified keys. After this call, the channel will no longer receive messages for these keys. If the channel wasn't subscribed to a key, that key is skipped. If all channels are unsubscribed from a key, the key is removed from the registry.