pubsub

package module
v0.3.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 22, 2025 License: MIT Imports: 2 Imported by: 0

README

pubsub

Go Reference Go Report Card Tests

pubsub is a simple and generic topic-based publish/subscribe package for in-process communication in Go.

Installation

go get -u github.com/mdawar/pubsub
Import
import "github.com/mdawar/pubsub"

Usage

Create a Broker
// Create a message broker.
// The type params are for the topic, payload and sender respectively.
broker := pubsub.NewBroker[string, string, string]()

// Any types can be used.
events := pubsub.NewBroker[string, Event, int]()
Subscriptions

A subscription is simply a channel, everything that you know about channels can be applied to subscriptions.

// Create a subscription to a topic.
// A subscription is a channel that receives messages published to the topic.
// By default the channel is unbuffered (capacity = 0).
sub1 := broker.Subscribe("events")

// A subscription can be created to multiple topics.
// The channel will receive all the messages published to all the specified topics.
sub2 := broker.Subscribe("events", "actions", "testing")

// Create a subscription with a buffered channel.
// The channel capacity is specified as the first parameter.
sub3 := broker.SubscribeWithCapacity(10, "events")

// Unsubscribe from a specific topic or topics.
// The channel will still receive messages for the other topics.
broker.Unsubscribe(sub2, "actions", "testing")

// Unsubscribe from all topics.
// The channel will not be closed, it will only stop receiving messages.
// NOTE: Specifying all the topics is much more efficient if performance is critical.
broker.Unsubscribe(sub2)
Publishing Messages
// A message is composed of a topic, payload and an optional sender.
// The type params are the same types used when creating the broker.
var msg = pubsub.Message[string, string, string]{
  Topic: "events",
  Payload: "Sample message",
  Sender: "sender-id",
}

// You can specify an alias for the generic message type.
type Message = pubsub.Message[string, string, string]

// Publish a message with the specified payload.
// The payload can be of any type that is specified when creating the broker.
//
// The message will be sent concurrently to the subscribers, ensuring that a slow
// consumer won't affect the other subscribers.
//
// This call will block until all the subscription channels receive the message
// or until the context is canceled.
//
// A nil return value indicates that all the subscribers received the message.
broker.Publish(context.TODO(), Message{
  Topic: "events",
  Payload: "Sample message",
  Sender: "sender-id", // Optional.
})
// Publish a message and timeout after 1 second.
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
// In this case, Publish will deliver the message to subscribers that are
// ready and will wait for the others for up to the timeout duration.
err := broker.Publish(ctx, Message{Topic: "events", Payload: "Sample message"})
// The error is not nil if the context was canceled or the deadline exceeded.
if err != nil {
  if errors.Is(err, context.DeadlineExceeded) {
    // Timed out.
  } else if errors.Is(err, context.Canceled) {
    // Canceled.
  }
}
// Non blocking publish (Fire and forget).
//
// The message is sent sequentially to the subscribers that are ready to receive it
// and the others are skipped.
//
// NOTE: Message delivery is not guaranteed.
broker.TryPublish(Message{
  Topic: "events",
  Payload: "A message that may not be delivered",
})

// Buffered subscriptions can be used for guaranteed delivery with a non-blocking publish.
//
// Publish will still block if any subscription's channel buffer is full, or any of the
// subscriptions is an unbuffered channel.
sub := broker.SubscribeWithCapacity(1, "events")
broker.Publish(context.TODO(), Message{
  Topic: "events",
  Payload: "Guaranteed delivery message",
})
Messages
sub := broker.Subscribe("events")

// Receive a message from the channel.
msg := <-sub

// The topic that the message was published on.
msg.Topic
// The payload that was published using Publish() or TryPublish().
msg.Payload
// The message sender.
msg.Sender
Topics
// Get a slice of all the topics registered on the broker.
// NOTE: The order of the topics is not guaranteed.
topics := broker.Topics()

// Get the total number of topics.
topicsCount := broker.NumTopics()

// Get the subscribers count on a specific topic.
count := broker.Subscribers("events")

Tests

go test -race -cover
# If you have "just" installed.
just test
# Or using make.
make test

Benchmarks

go test -bench .
# Or Using "just".
just benchmark
# Or using make.
make benchmark

Documentation

Overview

Package pubsub provides a generic and concurrency-safe, topic-based publish/subscribe library for in-process communication.

Example
// A broker with topic and message payload of type string.
broker := pubsub.NewBroker[string, string, string]()

// Topic to publish the message to.
topic := "example"
// Number of subscribers to the topic.
subCount := 100

var wg sync.WaitGroup

// Run consumer goroutines subscribed to the same topic.
for range subCount {
	wg.Add(1)
	go func() {
		defer wg.Done()

		// Subscribe to topic.
		sub := broker.Subscribe(topic)
		// Wait for message.
		msg := <-sub
		// Message fields.
		_ = msg.Topic
		_ = msg.Payload
		_ = msg.Sender
	}()
}

// Helper function to wait for all consumer goroutines to subscribe.
// This is just for the example and not needed in production code.
waitUntil(time.Second, func() bool {
	return broker.Subscribers(topic) == subCount
})

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

// Publish a message concurrently to the topic.
// This call will wait for all subscribers to receive the message or
// until the context is canceled (e.g. on timeout).
err := broker.Publish(ctx, Message{Topic: topic, Payload: "Message to deliver"})
if err != nil {
	// In this case 1 or more subscribers did not receive the message.
	switch {
	case errors.Is(err, context.Canceled):
		fmt.Println("Publishing was canceled")
	case errors.Is(err, context.DeadlineExceeded):
		fmt.Println("Publishing timed out")
	}
}

wg.Wait()

fmt.Println(err)
Output:

<nil>

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Broker

type Broker[T comparable, P any, S any] struct {
	// contains filtered or unexported fields
}

Broker represents a message broker.

The Broker is the core component of the pub/sub library. It manages the registration of subscribers and handles the publishing of messages to specific topics.

The Broker supports concurrent operations.

Example
package main

import (
	"github.com/mdawar/pubsub"
)

func main() {
	// A broker with topic, message payload and sender of type string.
	pubsub.NewBroker[string, string, string]()

	// A broker with integer topics and a uint sender.
	pubsub.NewBroker[int, string, uint]()
}

func NewBroker

func NewBroker[T comparable, P any, S any]() *Broker[T, P, S]

NewBroker creates a new message Broker instance.

func (*Broker[T, P, S]) NumTopics

func (b *Broker[T, P, S]) NumTopics() int

NumTopics returns the total number of topics registered on the Broker.

func (*Broker[T, P, S]) Publish

func (b *Broker[T, P, S]) Publish(ctx context.Context, msg Message[T, P, S]) error

Publish publishes a Message to the topic with the specified payload.

The message is sent concurrently to the subscribers, ensuring that a slow consumer won't affect the other subscribers.

This method will block and wait for all the subscriptions to receive the message or until the context is canceled.

The value of context.Context.Err will be returned.

A nil return value indicates that all the subscribers received the message.

If there are no subscribers to the topic, the message will be discarded.

Example
broker := pubsub.NewBroker[string, string, string]()

// Publish a message to the topic concurrently.
//
// This call will wait for all the subscribers to receive the message
// or the context to be canceled.
//
// If there are no subscribers the message will be discarded.
err := broker.Publish(context.TODO(), Message{
	Topic:   "events",
	Payload: "Message payload to deliver",
	Sender:  "sender-id", // Optional.
})

// A nil error is expected if the context is not canceled.
fmt.Println(err == nil)
Output:

true
Example (Timeout)
broker := pubsub.NewBroker[string, string, string]()

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

// Publish a message concurrently with a timeout of 1 second.
//
// Subscribers that are ready will receive the message, the others will be given
// up to the timeout value to receive the message.
//
// A slow consumer will not affect the other subscribers; the timeout applies
// individually to each subscriber.
err := broker.Publish(ctx, Message{
	Topic:   "events",
	Payload: "Message payload to deliver",
	Sender:  "sender-id", // Optional.
})
if err != nil {
	// In this case 1 or more subscribers did not receive the message.
	switch {
	case errors.Is(err, context.Canceled):
		fmt.Println("Publishing was canceled")
	case errors.Is(err, context.DeadlineExceeded):
		fmt.Println("Publishing timed out")
	}
}

func (*Broker[T, P, S]) Subscribe

func (b *Broker[T, P, S]) Subscribe(topics ...T) <-chan Message[T, P, S]

Subscribe creates a subscription for the specified topics.

The created subscription channel is unbuffered (capacity = 0).

Example
package main

import (
	"github.com/mdawar/pubsub"
)

func main() {
	broker := pubsub.NewBroker[string, string, string]()

	// Create a subscription to a single topic.
	sub1 := broker.Subscribe("events")
	_ = sub1

	// Create a subscription to multiple topics.
	sub2 := broker.Subscribe("events", "actions", "errors")
	_ = sub2
}

func (*Broker[T, P, S]) SubscribeWithCapacity

func (b *Broker[T, P, S]) SubscribeWithCapacity(capacity int, topics ...T) <-chan Message[T, P, S]

Subscribe creates a subscription for the specified topics with the specified capacity.

The capacity specifies the subscription channel's buffer capacity.

Example
package main

import (
	"github.com/mdawar/pubsub"
)

func main() {
	broker := pubsub.NewBroker[string, string, string]()

	// Create a subscription to a single topic with a specific channel capacity.
	sub1 := broker.SubscribeWithCapacity(10, "events")
	_ = sub1

	// Create a subscription to multiple topics with a specific channel capacity.
	sub2 := broker.SubscribeWithCapacity(10, "events", "actions", "errors")
	_ = sub2
}

func (*Broker[T, P, S]) Subscribers added in v0.2.0

func (b *Broker[T, P, S]) Subscribers(topic T) int

Subscribers returns the number of subscriptions on the specified topic.

Example
package main

import (
	"fmt"

	"github.com/mdawar/pubsub"
)

func main() {
	broker := pubsub.NewBroker[string, string, string]()
	topic := "example"

	for range 10 {
		broker.Subscribe(topic)
	}

	// The number of subscribers to this topic.
	subscribers := broker.Subscribers(topic)
	fmt.Println(subscribers)
}
Output:

10

func (*Broker[T, P, S]) Topics

func (b *Broker[T, P, S]) Topics() []T

Topics returns a slice of all the topics registered on the Broker.

A nil slice is returned if there are no topics.

NOTE: The order of the topics is not guaranteed.

func (*Broker[T, P, S]) TryPublish

func (b *Broker[T, P, S]) TryPublish(msg Message[T, P, S])

TryPublish publishes a message to the topic with the specified payload if the subscription's channel buffer is not full.

The message is sent sequentially to the subscribers that are ready to receive it and the others are skipped.

NOTE: Use the Broker.Publish method for guaranteed delivery.

Example
broker := pubsub.NewBroker[string, string, string]()
topic := "example"

// A subscription that will not receive the message.
// The channel is unbuffered and will not be ready when the message is published.
broker.Subscribe(topic)
// A buffered subscription that will receive the message.
bufferedSub := broker.SubscribeWithCapacity(1, topic)

// This method will send the message to the subscribers that are ready
// to receive it (channel buffer not full) and the others will be skipped.
broker.TryPublish(Message{Topic: topic, Payload: "abc", Sender: "app"})

// Receive the message on the buffered subscription.
msg := <-bufferedSub

fmt.Println(msg.Topic)
fmt.Println(msg.Payload)
fmt.Println(msg.Sender)
Output:

example
abc
app

func (*Broker[T, P, S]) Unsubscribe

func (b *Broker[T, P, S]) Unsubscribe(sub <-chan Message[T, P, S], topics ...T)

Unsubscribe removes a subscription for the specified topics.

All topic subscriptions are removed if none are specified.

The channel will not be closed, it will only stop receiving messages.

NOTE: Specifying the topics to unsubscribe from can be more efficient.

Example
package main

import (
	"github.com/mdawar/pubsub"
)

func main() {
	broker := pubsub.NewBroker[string, string, string]()

	sub := broker.Subscribe("events", "actions", "errors")

	// Unsubscribe from a single topic.
	broker.Unsubscribe(sub, "events")

	// Unsubscribe from all topics.
	// The channel will not be closed, it will only stop receiving messages.
	broker.Unsubscribe(sub)
}

type Message

type Message[T any, P any, S any] struct {
	// Topic is the topic on which the message is published.
	Topic T
	// Payload holds the published value.
	Payload P
	// Sender is an identifier for the message's sender.
	Sender S
}

Message represents a message delivered by the broker to a subscriber.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL