pubsub

package module
v0.0.0-...-1d4a831 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 7, 2025 License: MIT Imports: 3 Imported by: 0

README

Generic PubSub Implementation in Go

Go Reference License

A thread-safe, generic Publish-Subscribe implementation in Go with blocking semantics and context support.

Features

  • Type-safe generics - Works with any comparable key type and any message type
  • Thread-safe - Safe for concurrent use by multiple goroutines
  • Context support - Cancelation and timeout support for publishing
  • Blocking semantics - Guaranteed message delivery (when channels are properly managed)
  • Lightweight - Minimal dependencies (only standard library)
  • Efficient - O(1) subscription lookups and O(n) publishes (n = subscribers per key)

Installation

go get github.com/mdigger/pubsub

Usage

Basic Usage
import "github.com/mdigger/pubsub"

// Create a new PubSub instance
ps := pubsub.New[string, string]()

// Create subscriber channels
ch1 := make(chan string, 10)
ch2 := make(chan string, 10)

// Subscribe channels to topics
ps.Subscribe([]string{"topic1", "topic2"}, ch1)
ps.Subscribe([]string{"topic1"}, ch2)

// Publish messages (blocks until all subscribers receive)
go func() {
    delivered, err := ps.Publish(context.Background(), "topic1", "hello world")
    fmt.Printf("Delivered to %d subscribers, error: %v\n", delivered, err)
}()

// Receive messages
msg := <-ch1
fmt.Println("Received:", msg)

// Unsubscribe when done
ps.Unsubscribe([]string{"topic1"}, ch1)
Context-Aware Publishing
// With timeout
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()

delivered, err := ps.Publish(ctx, "topic1", "with timeout")
if err != nil {
    fmt.Println("Publish failed:", err)
} else {
    fmt.Println("Delivered to", delivered, "subscribers")
}

// Convenience method with timeout
delivered, err = ps.PublishWithTimeout("topic1", "convenience", 50*time.Millisecond)

Performance Considerations

  1. Channel Buffering: Use buffered channels to prevent blocking publishers
  2. Key Cardinality: Many unique keys will increase memory usage
  3. Fan-out: Publishing to keys with many subscribers will be slower
  4. Context Handling: Context checks add minimal overhead to publishing

Best Practices

  1. Always use buffered channels with sufficient capacity
  2. Ensure subscribers are actively reading from channels
  3. Consider using separate PubSub instances for different domains
  4. Clean up unused subscriptions with Unsubscribe
  5. Use context timeouts for publishing to slow consumers
  6. Check both delivery count and error when using context

Alternatives

For non-blocking semantics or different delivery guarantees, consider:

Documentation

Overview

Package pubsub implements a generic Publish-Subscribe pattern. It allows publishers to send messages to multiple subscribers based on topic keys, with thread-safe operations. The implementation is generic, supporting any comparable key type and any message type.

Example (ContextAwarePublishing)
package main

import (
	"context"
	"fmt"
	"time"

	"github.com/mdigger/pubsub"
)

func main() {
	ps := pubsub.New[string, string]()

	// Create a buffered channel to ensure the example works
	ch := make(chan string, 2)
	ps.Subscribe([]string{"alerts"}, ch)

	// Regular publish with background context
	delivered, err := ps.Publish(context.Background(), "alerts", "urgent")
	fmt.Printf("Delivered: %d, Error: %v\n", delivered, err)

	// Publish with timeout (using buffered channel to ensure delivery)
	delivered, err = ps.PublishWithTimeout("alerts", "timeout-test", 100*time.Millisecond)
	fmt.Printf("With timeout - Delivered: %d, Error: %v\n", delivered, err)

	// Consume the messages to clean up
	<-ch
	<-ch

}
Output:

Delivered: 1, Error: <nil>
With timeout - Delivered: 1, Error: <nil>

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type PubSub

type PubSub[K comparable, T any] struct {
	// contains filtered or unexported fields
}

PubSub implements the Publish-Subscribe pattern. It maintains a mapping of keys to subscriber channels, allowing efficient message distribution. K is the key type (must be comparable), T is the message type.

func New

func New[K comparable, T any]() *PubSub[K, T]

New creates and returns a new PubSub instance. The returned PubSub is ready to use with zero values initialized.

func (*PubSub[K, T]) Publish

func (ps *PubSub[K, T]) Publish(ctx context.Context, key K, msg T) (int, error)

Publish sends a message to all channels subscribed to the specified key. The operation will block until all subscribers receive the message or until: - The context is canceled - The timeout expires (if context has a deadline) Returns the number of successful deliveries and any context error encountered.

func (*PubSub[K, T]) PublishWithTimeout

func (ps *PubSub[K, T]) PublishWithTimeout(key K, msg T, timeout time.Duration) (int, error)

PublishWithTimeout is a convenience method that creates a context with timeout.

func (*PubSub[K, T]) Subscribe

func (ps *PubSub[K, T]) Subscribe(keys []K, ch chan T)

Subscribe adds a channel to receive messages for the specified keys. The channel will receive all messages published to any of the provided keys. If the channel is already subscribed to a key, this is a no-op.

Note: The channel should have sufficient buffer space or active readers to prevent indefinite blocking in the Publish method.

func (*PubSub[K, T]) Unsubscribe

func (ps *PubSub[K, T]) Unsubscribe(keys []K, ch chan T)

Unsubscribe removes a channel from receiving messages for the specified keys. After this call, the channel will no longer receive messages for these keys. If the channel wasn't subscribed to a key, that key is skipped. If all channels are unsubscribed from a key, the key is removed from the registry.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL