pubsub

package module
v0.0.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 1, 2026 License: Apache-2.0 Imports: 7 Imported by: 5

README

PubSub

A lightweight, type-safe publish/subscribe library for Go applications.

Features

  • Simple, intuitive API for topic-based messaging
  • Type-safe message filtering with generics
  • Support for message history
  • Channel merging capabilities
  • Non-blocking and Reliable message delivery options
  • Thread-safe operations
  • Callback-based message handling
  • Wildcard topic subscription

Installation

go get github.com/dioad/pubsub

Quick Start

package main

import (
    "fmt"
    "time"
    
    "github.com/dioad/pubsub"
)

func main() {
    // Create a new PubSub instance
    ps := pubsub.NewPubSub()
    
    // Subscribe to a topic
    ch := ps.Subscribe("notifications")
    
    // Process messages in a goroutine
    go func() {
        for msg := range ch {
            fmt.Printf("Received: %v\n", msg)
        }
    }()
    
    // Publish messages to the topic
    ps.Publish("notifications", "Hello, World!")
    ps.Publish("notifications", "Another message")
    
    // Wait for messages to be processed
    time.Sleep(100 * time.Millisecond)
}

Core Concepts

Topics

A Topic is the basic unit for message distribution. Publishers send messages to a topic, and subscribers receive messages from that topic.

There are several implementations of the Topic interface:

  • Basic Topic: A simple topic with no history.
  • Topic with History: Maintains a history of messages using a mutex-protected slice. Good for most use cases where history is needed.
  • Topic with Lock-Free History: Maintains history using a lock-free ring buffer. Recommended for high-concurrency environments to minimize contention on the publish path.
// Create a basic topic
topic := pubsub.NewTopic()

// Create a topic with history
topicWithHistory := pubsub.NewTopic(pubsub.WithHistory(10))

// Create a topic with lock-free history for high performance
topicLockFree := pubsub.NewTopic(pubsub.WithLockFreeHistoryOpt(100))

// Subscribe to the topic
ch := topic.Subscribe()

// Publish to the topic
topic.Publish("Hello from topic!")
PubSub

The PubSub interface manages multiple topics and provides a higher-level API.

There are two main implementations of the PubSub interface:

  • Default PubSub (NewPubSub): Suitable for most applications with a moderate number of topics and message rates. It uses a single map and mutex for topic management.
  • Sharded PubSub (NewShardedPubSub): Optimized for high-concurrency scenarios with many topics. It reduces lock contention by sharding topics across multiple internal maps.
// Create a default PubSub instance
ps := pubsub.NewPubSub()

// Create a sharded PubSub instance for high concurrency
psHighConf := pubsub.NewShardedPubSub()

// Subscribe to a specific topic
ch1 := ps.Subscribe("topic1")

// Subscribe to all topics
ch2 := ps.SubscribeAll()

// Publish to a topic
ps.Publish("topic1", "Message for topic1")
Message History

Enable message history to allow new subscribers to receive previously published messages.

// Create a PubSub with history
ps := pubsub.NewPubSub(pubsub.WithHistorySize(10))

// Publish some messages
ps.Publish("news", "Breaking news 1")
ps.Publish("news", "Breaking news 2")

// New subscribers will receive historical messages
ch := ps.Subscribe("news")
Type Filtering

Filter messages by type using generics.

type UserEvent struct {
    Username string
    Action   string
}

// Subscribe to only receive UserEvent messages
userEvents := pubsub.Subscribe[UserEvent](topic)
for event := range userEvents {
    fmt.Printf("User %s performed action: %s\n", event.Username, event.Action)
}
Custom Filtering

Apply custom filters to messages.

// Filter for high-priority messages
highPriorityMsgs := pubsub.SubscribeWithFilter(topic, func(msg AlertMessage) bool {
    return msg.Priority > 7
})
Channel Merging

Merge multiple channels into a single channel.

// Merge channels from different topics
ch1 := topic1.Subscribe()
ch2 := topic2.Subscribe()
mergedCh := pubsub.Merge[UserEvent](ch1, ch2)
Reliable Delivery

For scenarios where message delivery is more important than non-blocking speed, use the PublishReliable and SubscribeUnbuffered methods.

// Create unbuffered subscription
ch := ps.SubscribeUnbuffered("critical-events")

// Publish with 100ms timeout for blocking delivery
ps.PublishReliable("critical-events", "Important Message")

Examples

See the examples directory for more detailed examples:

Performance

Run the benchmarks to evaluate performance:

go test -bench=. github.com/dioad/pubsub

License

This project is licensed under the Apache License 2.0 - see the LICENSE file for details.

Documentation

Overview

Package pubsub provides a high-performance, thread-safe publish/subscribe messaging system. It supports topic-based message distribution, message history, type-safe filtering, and reliable delivery mechanisms.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ApplyChan added in v0.0.7

func ApplyChan[A any, B any](s <-chan A, f func(A) (B, error)) <-chan B

ApplyChan creates a new channel by applying a transformation function to each message from the input channel. If the transformation function returns an error, the message is dropped. The output channel has the same capacity as the source channel.

Example:

rawCh := topic.Subscribe()
stringCh := ApplyChan(rawCh, func(msg any) (string, error) {
    if s, ok := msg.(string); ok {
        return strings.ToUpper(s), nil
    }
    return "", errors.New("not a string")
})

func CastChan added in v0.0.4

func CastChan[T any](s <-chan any) <-chan T

CastChan creates a new channel that converts messages from an any channel to type T. It internally uses FilterChan with an always-true filter, effectively performing only type conversion. Messages that cannot be type-asserted to T are silently dropped.

Example:

// Convert a generic channel to a type-specific channel
rawCh := topic.Subscribe()
userCh := CastChan[User](rawCh)

// Process only User messages
for user := range userCh {
    fmt.Printf("User: %s (ID: %d)\n", user.Name, user.ID)
}

func FilterChan

func FilterChan[T any](s <-chan any, f func(T) bool) <-chan T

FilterChan returns a channel that will receive messages from the input channel that pass a filter function. It performs both type assertion and custom filtering in a single operation. Messages that don't match the type T or don't pass the filter function are silently dropped.

Example:

// Filter for only high-priority messages
rawCh := topic.Subscribe()
highPriorityMsgs := FilterChan(rawCh, func(msg AlertMessage) bool {
    return msg.Priority > 7
})
for alert := range highPriorityMsgs {
    handleHighPriorityAlert(alert)
}

func Merge added in v0.0.4

func Merge[B any](channels ...<-chan any) <-chan B

Merge combines multiple input channels into a single output channel of type B. It uses reflection to handle channels of any type, allowing for flexible merging. The output channel's buffer capacity is the sum of all input channel capacities, or the number of channels if they are unbuffered. When any of the input channels is closed, it is removed from the merge set, but the merged channel remains open until all input channels are closed.

Example:

// Merge channels from different topics
ch1 := topic1.Subscribe()
ch2 := topic2.Subscribe()
mergedCh := Merge[UserEvent](ch1, ch2)

// Process events from both channels
for event := range mergedCh {
    fmt.Printf("User %s performed action: %s\n", event.Username, event.Action)
}

func Subscribe

func Subscribe[T any](t Topic) <-chan T

Subscribe returns a channel that will receive messages published to the topic that have a particular type. It creates a new buffered channel with the same capacity as the source channel and automatically filters messages based on type T. Messages that cannot be type-asserted to T are silently dropped. Uses a non-blocking send to prevent deadlocks if the output channel's buffer is full.

Example:

// Subscribe to only receive UserEvent messages
userEvents := pubsub.Subscribe[UserEvent](topic)
for event := range userEvents {
    fmt.Printf("User %s performed action: %s\n", event.Username, event.Action)
}

func SubscribeWithFilter

func SubscribeWithFilter[T any](t Topic, f func(T) bool) <-chan T

SubscribeWithFilter returns a channel that will receive messages published to the topic that have a particular type and pass a filter function. This combines type filtering with custom filtering logic in a single operation.

Example:

// Subscribe to only receive successful login events
loginEvents := pubsub.SubscribeWithFilter(topic, func(event LoginEvent) bool {
    return event.Success == true
})
for event := range loginEvents {
    fmt.Printf("Successful login: %s at %v\n", event.Username, event.Timestamp)
}

Types

type DropMetrics added in v0.0.8

type DropMetrics struct {
	// DroppedCount is the total number of messages dropped due to full buffers.
	DroppedCount uint64
	// FilteredCount is the total number of messages filtered out (not matching criteria).
	FilteredCount uint64
}

DropMetrics tracks statistics about dropped messages in channel operations.

func ApplyChanWithMetrics added in v0.0.8

func ApplyChanWithMetrics[A any, B any](s <-chan A, f func(A) (B, error), metrics *DropMetrics) (<-chan B, *DropMetrics)

ApplyChanWithMetrics is like ApplyChan but also tracks dropped message statistics. If metrics is non-nil, it will be updated with counts of filtered and dropped messages. Returns both the output channel and the metrics object for observation.

Example:

metrics := &pubsub.DropMetrics{}
stringCh, _ := ApplyChanWithMetrics(rawCh, transformFunc, metrics)
// Later: check metrics.GetDropped() for buffer overflow count

func (*DropMetrics) GetDropped added in v0.0.8

func (m *DropMetrics) GetDropped() uint64

GetDropped returns the current dropped count.

func (*DropMetrics) GetFiltered added in v0.0.8

func (m *DropMetrics) GetFiltered() uint64

GetFiltered returns the current filtered count.

func (*DropMetrics) IncrementDropped added in v0.0.8

func (m *DropMetrics) IncrementDropped()

IncrementDropped atomically increments the dropped count.

func (*DropMetrics) IncrementFiltered added in v0.0.8

func (m *DropMetrics) IncrementFiltered()

IncrementFiltered atomically increments the filtered count.

type EventTuple added in v0.0.7

type EventTuple struct {
	// Topic is the name of the topic the message belongs to.
	Topic string
	// Event is the actual message content.
	Event any
}

EventTuple represents a message and its associated topic, used by Feeders.

type Feeder added in v0.0.7

type Feeder interface {
	// Feed returns a channel that emits EventTuple instances to be published.
	Feed() <-chan *EventTuple
}

Feeder is an interface for components that provide a stream of messages to PubSub.

type FeedingFunc added in v0.0.7

type FeedingFunc func() <-chan *EventTuple

FeedingFunc is a function type that acts as a Feeder.

type NoopObserver added in v0.0.8

type NoopObserver struct{}

NoopObserver is an Observer that does nothing.

func (NoopObserver) OnDrop added in v0.0.8

func (o NoopObserver) OnDrop(topic string, msg any)

func (NoopObserver) OnPublish added in v0.0.8

func (o NoopObserver) OnPublish(topic string, msg any)

func (NoopObserver) OnSubscribe added in v0.0.8

func (o NoopObserver) OnSubscribe(topic string)

func (NoopObserver) OnUnsubscribe added in v0.0.8

func (o NoopObserver) OnUnsubscribe(topic string)

type Observer added in v0.0.8

type Observer interface {
	// OnPublish is called when a message is published to a topic.
	OnPublish(topic string, msg any)
	// OnDrop is called when a message is dropped due to a full buffer.
	OnDrop(topic string, msg any)
	// OnSubscribe is called when a new subscription is created for a topic.
	OnSubscribe(topic string)
	// OnUnsubscribe is called when a subscription is removed from a topic.
	OnUnsubscribe(topic string)
}

Observer is an interface for components that want to observe events in the PubSub system.

type Opt

type Opt func(*pubSub)

Opt is a functional option for configuring a PubSub instance.

func WithHistorySize

func WithHistorySize(size int) Opt

WithHistorySize enables message history for all topics created by the PubSub instance. It sets the maximum number of historical messages to store per topic.

func WithLockFreeHistory added in v0.0.8

func WithLockFreeHistory(size int) Opt

WithLockFreeHistory is an option for NewPubSub/NewShardedPubSub that enables lock-free message history for all topics.

func WithObserver added in v0.0.8

func WithObserver(o Observer) Opt

WithObserver sets the observer for the PubSub instance.

type PubSub

type PubSub interface {
	// Publish sends messages to a specific topic. All subscribers to the topic
	// and subscribers to the "*" topic will receive these messages.
	Publish(topic string, msg ...any) PublishResult

	// AddFeeder registers a Feeder that will provide messages to the PubSub system.
	// The Feeder should implement the Feed method, which returns a channel that emits
	// EventTuple instances. Each EventTuple contains the topic and the event message.
	// This allows for dynamic message feeding into the PubSub system.
	// The context can be used to cancel the feeding process.
	AddFeeder(ctx context.Context, f Feeder)

	// AddFeedingFunc registers a FeedingFunc that will provide messages to the PubSub system.
	// The FeedingFunc should return a channel that emits EventTuple instances.
	// Each EventTuple contains the topic and the event message.
	// This allows for dynamic message feeding into the PubSub system.
	// The context can be used to cancel the feeding process.
	AddFeedingFunc(ctx context.Context, f FeedingFunc)

	// Topic returns a Topic instance for the given topic name.
	// If the topic doesn't exist, it creates a new one.
	Topic(topic string) Topic

	// Topics returns a list of all currently registered topics.
	Topics() []string

	// Subscribe creates a subscription to a specific topic and returns a channel
	// that will receive all messages published to that topic.
	// If history is enabled, new subscribers will receive historical messages.
	Subscribe(topic string) <-chan any

	// SubscribeFunc registers a callback function that will be invoked for
	// each message published to the specified topic.
	// If history is enabled, the callback will be invoked for historical messages.
	SubscribeFunc(topic string, f func(msg any))

	// Unsubscribe removes a subscription channel from a specific topic.
	// After unsubscribing, the channel will be closed.
	Unsubscribe(topic string, sub <-chan any)

	// SubscribeAll creates a subscription to all topics by subscribing to
	// the special "*" topic and returns a channel for receiving messages.
	// If history is enabled, new subscribers will receive historical messages from all topics.
	SubscribeAll() <-chan any

	// SubscribeAllFunc registers a callback function that will be invoked
	// for every message published to any topic.
	// If history is enabled, the callback will be invoked for historical messages from all topics.
	SubscribeAllFunc(f func(msg any))

	// UnsubscribeAll removes a subscription channel from the special "*" topic,
	// effectively unsubscribing from all messages.
	// After unsubscribing, the channel will be closed.
	UnsubscribeAll(sub <-chan any)

	// SubscribeWithBuffer creates a subscription to a specific topic with a custom buffer size.
	SubscribeWithBuffer(topic string, size int) <-chan any

	// DeleteTopic removes a topic and closes all its subscriptions.
	DeleteTopic(name string)

	// PublishReliable sends messages to a specific topic using blocking sends with a timeout.
	// This ensures messages are delivered even to unbuffered channels, but may block briefly.
	// Returns the total number of deliveries across the topic and the "*" topic.
	PublishReliable(topic string, msg ...any) int

	// AddFeederReliable registers a Feeder that will provide messages to the PubSub system using reliable delivery.
	// The context can be used to cancel the feeding process.
	AddFeederReliable(ctx context.Context, f Feeder)

	// AddFeedingFuncReliable registers a FeedingFunc that will provide messages to the PubSub system using reliable delivery.
	// The context can be used to cancel the feeding process.
	AddFeedingFuncReliable(ctx context.Context, f FeedingFunc)

	// SubscribeUnbuffered creates a subscription to a specific topic and returns an unbuffered channel.
	// This should be used with PublishReliable for guaranteed message delivery.
	SubscribeUnbuffered(topic string) <-chan any

	// SubscribeAllUnbuffered creates a subscription to all topics and returns an unbuffered channel.
	SubscribeAllUnbuffered() <-chan any

	// Shutdown gracefully shuts down the PubSub instance, closing all topics and subscriptions.
	// The context can be used to set a timeout for the shutdown process.
	Shutdown(ctx context.Context)

	// Close is an alias for Shutdown with a background context.
	Close()
}

PubSub is a thread-safe publish/subscribe message broker interface that enables topic-based message distribution. It supports both direct channel subscriptions and callback-based message handling. The interface provides message history capabilities when enabled, and supports a special "*" topic that receives all messages from all topics.

There are two main implementations of the PubSub interface:

  • Default PubSub (NewPubSub): Appropriate for most use cases with a moderate number of topics and message rates.
  • Sharded PubSub (NewShardedPubSub): Optimized for high-concurrency scenarios with many topics, reducing lock contention by sharding topics across multiple internal maps.

Basic usage:

// Create a new PubSub instance
ps := pubsub.NewPubSub()

// Subscribe to a topic
ch := ps.Subscribe("notifications")

// Process messages in a goroutine
go func() {
    for msg := range ch {
        fmt.Printf("Received: %v\n", msg)
    }
}()

// Publish messages to the topic
ps.Publish("notifications", "Hello, World!")

// With history:
ps := pubsub.NewPubSub(pubsub.WithHistorySize(10))

func NewPubSub

func NewPubSub(opt ...Opt) PubSub

NewPubSub creates a new default PubSub instance. This implementation uses a single map and mutex for topic management, which is efficient for most applications with a standard number of topics. Optionally pass WithHistorySize to enable history for all topics and set the size of the history buffer.

func NewShardedPubSub added in v0.0.8

func NewShardedPubSub(opt ...Opt) PubSub

NewShardedPubSub creates a new sharded PubSub instance optimized for high-concurrency scenarios. It distributes topics across 16 shards to minimize lock contention by reducing the frequency of global lock acquisition. This is particularly beneficial when many topics are being created or accessed simultaneously by different goroutines.

type PublishResult added in v0.0.8

type PublishResult struct {
	// Deliveries is the number of successful deliveries.
	Deliveries int
	// Drops is the number of messages dropped due to full buffers.
	Drops int
}

PublishResult contains information about the result of a Publish operation.

type Topic

type Topic interface {
	// Publish sends one or more messages to all subscribers of the topic.
	// Uses a non-blocking send to prevent deadlocks if a subscriber is not reading.
	// Returns a PublishResult containing the number of successful deliveries and drops.
	Publish(msg ...any) PublishResult

	// Subscribe returns a channel that will receive messages published to the topic.
	// If the topic has history enabled, the channel will receive historical messages.
	Subscribe() <-chan any

	// SubscribeFunc subscribes to the topic and calls the provided function for each message.
	// Returns the subscription channel that can be used to unsubscribe later.
	// If the topic has history enabled, the function will be called for historical messages.
	SubscribeFunc(f func(msg any)) <-chan any

	// SubscribeWithBuffer returns a channel with a custom buffer size that will receive messages.
	// Larger buffer sizes can help prevent message loss when subscribers can't keep up.
	SubscribeWithBuffer(size int) <-chan any

	// Unsubscribe removes a channel from the list of subscribers and closes the channel.
	// After unsubscribing, the channel will be closed.
	Unsubscribe(ch <-chan any)

	// PublishReliable publishes a message to the topic using blocking sends with timeout.
	// This ensures messages are delivered even to unbuffered channels, but may block briefly.
	// Returns the number of subscribers that successfully received the message.
	PublishReliable(msg ...any) int

	// SubscribeUnbuffered returns an unbuffered channel that will receive messages.
	// Should be used with PublishReliable for guaranteed message delivery.
	SubscribeUnbuffered() <-chan any

	// Shutdown closes all subscriptions with context for timeout support.
	Shutdown(ctx context.Context)

	// Close shuts down the topic and closes all subscription channels.
	Close()
}

Topic is a simple, single topic, publish/subscribe interface. It provides methods to publish messages, subscribe to messages, and unsubscribe from the topic. Messages can be of any type.

There are several implementations of the Topic interface:

  • Basic Topic (NewTopic): A simple topic with no history.
  • Topic with History (newTopicWithHistory): Maintains a history of messages using a mutex-protected slice. Good for most use cases where history is needed.
  • Topic with Lock-Free History (newTopicWithLockFreeHistory): Maintains history using a lock-free ring buffer. Recommended for high-concurrency environments to minimize contention on the publish path.

Basic usage:

// Create a new Topic
topic := pubsub.NewTopic()

// Subscribe to the topic
ch := topic.Subscribe()

// Process messages in a goroutine
go func() {
    for msg := range ch {
        fmt.Printf("Received: %v\n", msg)
    }
}()

// Publish messages to the topic
topic.Publish("Hello, World!")

// With history:
topicWithHistory := pubsub.NewTopic(pubsub.WithHistory(10))

func NewTopic

func NewTopic(opts ...TopicOpt) Topic

NewTopic creates a new Topic instance that implements a simple publish/subscribe messaging system. It accepts variadic options to configure features like history and observers.

Example:

topic := pubsub.NewTopic(pubsub.WithHistory(10))

type TopicOpt added in v0.0.8

type TopicOpt func(*topicConfig)

TopicOpt is a functional option for configuring a Topic instance.

func WithHistory added in v0.0.8

func WithHistory(size int) TopicOpt

WithHistory enables message history for the Topic.

func WithLockFreeHistoryOpt added in v0.0.8

func WithLockFreeHistoryOpt(size int) TopicOpt

WithLockFreeHistoryOpt enables lock-free message history for the Topic.

func WithTopicObserver added in v0.0.8

func WithTopicObserver(o Observer) TopicOpt

WithTopicObserver sets the observer for the Topic instance.

Directories

Path Synopsis
examples
basic command
feeder command
filtering command
history command
merging command
otel-observer command
internal
topicstore
Package topicstore provides topic storage implementations for the pubsub package.
Package topicstore provides topic storage implementations for the pubsub package.
Package otelpubsub provides an OpenTelemetry and Prometheus observer for the pubsub package.
Package otelpubsub provides an OpenTelemetry and Prometheus observer for the pubsub package.
pkg
ringbuffer
Package ringbuffer provides a lock-free ring buffer implementation for storing message history with concurrent read/write support.
Package ringbuffer provides a lock-free ring buffer implementation for storing message history with concurrent read/write support.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL