Documentation
¶
Overview ¶
Package pubsub provides a generic and concurrency-safe, topic-based publish/subscribe library for in-process communication.
Example ¶
// A broker with topic and message payload of type string. broker := pubsub.NewBroker[string, string, string]() // Topic to publish the message to. topic := "example" // Number of subscribers to the topic. subCount := 100 var wg sync.WaitGroup // Run consumer goroutines subscribed to the same topic. for range subCount { wg.Add(1) go func() { defer wg.Done() // Subscribe to topic. sub := broker.Subscribe(topic) // Wait for message. msg := <-sub // Message fields. _ = msg.Topic _ = msg.Payload _ = msg.Sender }() } // Helper function to wait for all consumer goroutines to subscribe. // This is just for the example and not needed in production code. waitUntil(time.Second, func() bool { return broker.Subscribers(topic) == subCount }) ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() // Publish a message concurrently to the topic. // This call will wait for all subscribers to receive the message or // until the context is canceled (e.g. on timeout). err := broker.Publish(ctx, Message{Topic: topic, Payload: "Message to deliver"}) if err != nil { // In this case 1 or more subscribers did not receive the message. switch { case errors.Is(err, context.Canceled): fmt.Println("Publishing was canceled") case errors.Is(err, context.DeadlineExceeded): fmt.Println("Publishing timed out") } } wg.Wait() fmt.Println(err)
Output: <nil>
Index ¶
- type Broker
- func (b *Broker[T, P, S]) NumTopics() int
- func (b *Broker[T, P, S]) Publish(ctx context.Context, msg Message[T, P, S]) error
- func (b *Broker[T, P, S]) Subscribe(topics ...T) <-chan Message[T, P, S]
- func (b *Broker[T, P, S]) SubscribeWithCapacity(capacity int, topics ...T) <-chan Message[T, P, S]
- func (b *Broker[T, P, S]) Subscribers(topic T) int
- func (b *Broker[T, P, S]) Topics() []T
- func (b *Broker[T, P, S]) TryPublish(msg Message[T, P, S])
- func (b *Broker[T, P, S]) Unsubscribe(sub <-chan Message[T, P, S], topics ...T)
- type Message
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Broker ¶
type Broker[T comparable, P any, S any] struct { // contains filtered or unexported fields }
Broker represents a message broker.
The Broker is the core component of the pub/sub library. It manages the registration of subscribers and handles the publishing of messages to specific topics.
The Broker supports concurrent operations.
Example ¶
package main import ( "github.com/mdawar/pubsub" ) func main() { // A broker with topic, message payload and sender of type string. pubsub.NewBroker[string, string, string]() // A broker with integer topics and a uint sender. pubsub.NewBroker[int, string, uint]() }
func NewBroker ¶
func NewBroker[T comparable, P any, S any]() *Broker[T, P, S]
NewBroker creates a new message Broker instance.
func (*Broker[T, P, S]) NumTopics ¶
NumTopics returns the total number of topics registered on the Broker.
func (*Broker[T, P, S]) Publish ¶
Publish publishes a Message to the topic with the specified payload.
The message is sent concurrently to the subscribers, ensuring that a slow consumer won't affect the other subscribers.
This method will block and wait for all the subscriptions to receive the message or until the context is canceled.
The value of context.Context.Err will be returned.
A nil return value indicates that all the subscribers received the message.
If there are no subscribers to the topic, the message will be discarded.
Example ¶
broker := pubsub.NewBroker[string, string, string]() // Publish a message to the topic concurrently. // // This call will wait for all the subscribers to receive the message // or the context to be canceled. // // If there are no subscribers the message will be discarded. err := broker.Publish(context.TODO(), Message{ Topic: "events", Payload: "Message payload to deliver", Sender: "sender-id", // Optional. }) // A nil error is expected if the context is not canceled. fmt.Println(err == nil)
Output: true
Example (Timeout) ¶
broker := pubsub.NewBroker[string, string, string]() ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() // Publish a message concurrently with a timeout of 1 second. // // Subscribers that are ready will receive the message, the others will be given // up to the timeout value to receive the message. // // A slow consumer will not affect the other subscribers; the timeout applies // individually to each subscriber. err := broker.Publish(ctx, Message{ Topic: "events", Payload: "Message payload to deliver", Sender: "sender-id", // Optional. }) if err != nil { // In this case 1 or more subscribers did not receive the message. switch { case errors.Is(err, context.Canceled): fmt.Println("Publishing was canceled") case errors.Is(err, context.DeadlineExceeded): fmt.Println("Publishing timed out") } }
func (*Broker[T, P, S]) Subscribe ¶
Subscribe creates a subscription for the specified topics.
The created subscription channel is unbuffered (capacity = 0).
Example ¶
package main import ( "github.com/mdawar/pubsub" ) func main() { broker := pubsub.NewBroker[string, string, string]() // Create a subscription to a single topic. sub1 := broker.Subscribe("events") _ = sub1 // Create a subscription to multiple topics. sub2 := broker.Subscribe("events", "actions", "errors") _ = sub2 }
func (*Broker[T, P, S]) SubscribeWithCapacity ¶
Subscribe creates a subscription for the specified topics with the specified capacity.
The capacity specifies the subscription channel's buffer capacity.
Example ¶
package main import ( "github.com/mdawar/pubsub" ) func main() { broker := pubsub.NewBroker[string, string, string]() // Create a subscription to a single topic with a specific channel capacity. sub1 := broker.SubscribeWithCapacity(10, "events") _ = sub1 // Create a subscription to multiple topics with a specific channel capacity. sub2 := broker.SubscribeWithCapacity(10, "events", "actions", "errors") _ = sub2 }
func (*Broker[T, P, S]) Subscribers ¶ added in v0.2.0
Subscribers returns the number of subscriptions on the specified topic.
Example ¶
package main import ( "fmt" "github.com/mdawar/pubsub" ) func main() { broker := pubsub.NewBroker[string, string, string]() topic := "example" for range 10 { broker.Subscribe(topic) } // The number of subscribers to this topic. subscribers := broker.Subscribers(topic) fmt.Println(subscribers) }
Output: 10
func (*Broker[T, P, S]) Topics ¶
func (b *Broker[T, P, S]) Topics() []T
Topics returns a slice of all the topics registered on the Broker.
A nil slice is returned if there are no topics.
NOTE: The order of the topics is not guaranteed.
func (*Broker[T, P, S]) TryPublish ¶
TryPublish publishes a message to the topic with the specified payload if the subscription's channel buffer is not full.
The message is sent sequentially to the subscribers that are ready to receive it and the others are skipped.
NOTE: Use the Broker.Publish method for guaranteed delivery.
Example ¶
broker := pubsub.NewBroker[string, string, string]() topic := "example" // A subscription that will not receive the message. // The channel is unbuffered and will not be ready when the message is published. broker.Subscribe(topic) // A buffered subscription that will receive the message. bufferedSub := broker.SubscribeWithCapacity(1, topic) // This method will send the message to the subscribers that are ready // to receive it (channel buffer not full) and the others will be skipped. broker.TryPublish(Message{Topic: topic, Payload: "abc", Sender: "app"}) // Receive the message on the buffered subscription. msg := <-bufferedSub fmt.Println(msg.Topic) fmt.Println(msg.Payload) fmt.Println(msg.Sender)
Output: example abc app
func (*Broker[T, P, S]) Unsubscribe ¶
Unsubscribe removes a subscription for the specified topics.
All topic subscriptions are removed if none are specified.
The channel will not be closed, it will only stop receiving messages.
NOTE: Specifying the topics to unsubscribe from can be more efficient.
Example ¶
package main import ( "github.com/mdawar/pubsub" ) func main() { broker := pubsub.NewBroker[string, string, string]() sub := broker.Subscribe("events", "actions", "errors") // Unsubscribe from a single topic. broker.Unsubscribe(sub, "events") // Unsubscribe from all topics. // The channel will not be closed, it will only stop receiving messages. broker.Unsubscribe(sub) }
type Message ¶
type Message[T any, P any, S any] struct { // Topic is the topic on which the message is published. Topic T // Payload holds the published value. Payload P // Sender is an identifier for the message's sender. Sender S }
Message represents a message delivered by the broker to a subscriber.