pubsub

package
v0.6.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 14, 2026 License: MIT Imports: 6 Imported by: 0

README

pubsub

A simple, low-level publish-subscribe messaging system for Go. Designed as a dumb transport layer for prototyping event-driven applications.

Features

  • Fire-and-forget: Event notifications, not a queue (no guarantees, no retries)
  • Two implementations:
    • InMemory: Channel-based, single-process
    • Postgres: LISTEN/NOTIFY-based, multi-process
  • Interface-based: Easy to mock and test
  • No opinions: No durability, generics, or serialization - build your own adapters
  • Context-aware: Subscribers automatically unsubscribe when context is cancelled

Installation

go get github.com/erlorenz/go-toolbox/pubsub

Quick Start

In-Memory (Single Process)
import "github.com/erlorenz/go-toolbox/pubsub"

func main() {
    broker := pubsub.NewInMemory()
    defer broker.Close()

    ctx := context.Background()

    // Subscribe
    broker.Subscribe(ctx, "events", func(payload []byte) {
        fmt.Printf("Received: %s\n", payload)
    })

    // Publish
    broker.Publish(ctx, "events", []byte("hello world"))
}
PostgreSQL (Multi-Process)
import (
    "github.com/erlorenz/go-toolbox/pubsub"
    "github.com/jackc/pgx/v5/pgxpool"
)

pool, _ := pgxpool.New(ctx, "postgres://...")
defer pool.Close()

broker := pubsub.NewPostgres(pool)
defer broker.Close()

// Subscribe (in process A)
broker.Subscribe(ctx, "events", func(payload []byte) {
    fmt.Printf("Process A received: %s\n", payload)
})

// Publish (in process B)
broker.Publish(ctx, "events", []byte("hello from B"))

Interface

type Broker interface {
    Publish(ctx context.Context, topic string, payload []byte) error
    Subscribe(ctx context.Context, topic string, fn func([]byte)) error
    Close() error
}

Use Cases

✅ Good For
  • Event notifications - Notify other parts of your app when something happens
  • Cache invalidation - Broadcast cache clear events
  • Live updates - SSE/WebSocket fan-out (see examples)
  • Development - Prototyping event-driven architectures
  • Decoupling - Separate components that don't need guaranteed delivery
❌ Not Good For
  • Task queues - No retry, no persistence (use pgboss, River, etc.)
  • Critical events - No delivery guarantees
  • Ordering - No guarantee of message order
  • Large payloads - 8KB limit for Postgres

Build application-specific adapters for type safety and business logic:

// Domain event
type JobCompleted struct {
    JobID   string `json:"job_id"`
    BatchID string `json:"batch_id"`
    Status  string `json:"status"`
}

// Typed adapter
type JobEventsAdapter struct {
    broker pubsub.Broker
}

func NewJobEventsAdapter(broker pubsub.Broker) *JobEventsAdapter {
    return &JobEventsAdapter{broker: broker}
}

// Type-safe publish
func (a *JobEventsAdapter) PublishJobCompleted(ctx context.Context, event JobCompleted) error {
    data, err := json.Marshal(event)
    if err != nil {
        return err
    }
    return a.broker.Publish(ctx, "job.completed", data)
}

// Filtered subscription
func (a *JobEventsAdapter) SubscribeToJobsInBatch(ctx context.Context, batchID string) <-chan JobCompleted {
    ch := make(chan JobCompleted, 10)

    a.broker.Subscribe(ctx, "job.completed", func(payload []byte) {
        var event JobCompleted
        if err := json.Unmarshal(payload, &event); err != nil {
            return
        }

        // Application-level filtering
        if event.BatchID == batchID {
            select {
            case ch <- event:
            default:
                // Drop if channel is full
            }
        }
    })

    return ch
}

Usage:

adapter := NewJobEventsAdapter(broker)

// Publish
adapter.PublishJobCompleted(ctx, JobCompleted{
    JobID:   "123",
    BatchID: "batch-1",
    Status:  "success",
})

// Subscribe with filtering
events := adapter.SubscribeToJobsInBatch(ctx, "batch-1")
for event := range events {
    fmt.Printf("Job %s completed\n", event.JobID)
}

Context Handling

Subscribers automatically unsubscribe when their context is cancelled:

ctx, cancel := context.WithCancel(context.Background())

broker.Subscribe(ctx, "events", func(payload []byte) {
    fmt.Println("Received:", string(payload))
})

// Later...
cancel()  // Subscriber is automatically removed

Implementation Details

InMemory
  • Uses Go channels for message delivery
  • Goroutines for each subscriber
  • Handler errors are silently ignored (fire-and-forget)
  • Perfect for single-process applications
Postgres
  • Uses PostgreSQL's LISTEN/NOTIFY commands
  • One dedicated connection per topic
  • Multiple subscribers on same topic share one LISTEN connection
  • Automatic cleanup when all subscribers unsubscribe
  • Payload limit: 8000 bytes (PostgreSQL restriction)
  • No durability: Messages lost if no subscribers

Error Handling

Both implementations follow fire-and-forget semantics:

  • Publish() may return connection errors
  • Handler panics are caught and ignored
  • Slow handlers won't block publishers
  • No retry logic

Examples

Server-Sent Events (SSE)

See examples/pubsub for a complete SSE implementation showing:

  • Broadcasting events to multiple clients
  • Client disconnect handling
  • Typed event adapters
Cache Invalidation
type CacheInvalidator struct {
    broker pubsub.Broker
}

func (c *CacheInvalidator) Invalidate(ctx context.Context, keys ...string) error {
    data, _ := json.Marshal(keys)
    return c.broker.Publish(ctx, "cache.invalidate", data)
}

func (c *CacheInvalidator) Watch(ctx context.Context, onInvalidate func([]string)) {
    c.broker.Subscribe(ctx, "cache.invalidate", func(payload []byte) {
        var keys []string
        json.Unmarshal(payload, &keys)
        onInvalidate(keys)
    })
}

Testing

Mock the interface for testing:

type MockBroker struct {
    published []struct {
        topic   string
        payload []byte
    }
}

func (m *MockBroker) Publish(ctx context.Context, topic string, payload []byte) error {
    m.published = append(m.published, struct{topic string; payload []byte}{topic, payload})
    return nil
}

func (m *MockBroker) Subscribe(ctx context.Context, topic string, fn func([]byte)) error {
    return nil
}

func (m *MockBroker) Close() error {
    return nil
}

Comparison

Feature InMemory Postgres
Single process
Multi-process
Payload limit None 8KB
Setup None PostgreSQL
Performance Very fast Fast
Durability None None

When to Upgrade

Consider upgrading to a proper message queue when you need:

  • Guaranteed delivery - Messages must not be lost
  • Retries - Failed handlers should retry
  • Ordering - Messages must be processed in order
  • Persistence - Messages survive crashes
  • Backpressure - Slow consumers shouldn't lose messages

Good alternatives:

  • River - Job queue for Postgres
  • pgboss - Node.js equivalent
  • NATS - Lightweight messaging system
  • Redis Pub/Sub - Similar semantics, widely supported

License

MIT

Documentation

Overview

Package pubsub provides a simple publish-subscribe messaging system.

The package defines low-level interfaces for publishing and subscribing to topics with []byte payloads. It's designed to be a dumb transport layer - users should build their own adapters for type safety, filtering, and business logic.

Two implementations are provided:

  • InMemory: Channel-based, single-process pub/sub
  • Postgres: LISTEN/NOTIFY-based, multi-process pub/sub

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrClosed is returned when operations are attempted on a closed broker.
	ErrClosed = errors.New("pubsub: broker is closed")
)

Common errors.

Functions

This section is empty.

Types

type Broker

type Broker interface {
	Publisher
	Subscriber
}

Broker combines Publisher and Subscriber interfaces. Most implementations provide both capabilities.

type InMemory

type InMemory struct {
	// contains filtered or unexported fields
}

InMemory is a simple in-memory broker using Go channels. It's suitable for single-process applications, testing, and development. Messages are not persisted and are lost if no subscribers are active.

func NewInMemory

func NewInMemory() *InMemory

NewInMemory creates a new in-memory broker.

func (*InMemory) Close

func (m *InMemory) Close() error

Close stops all subscriptions and prevents new ones.

func (*InMemory) Publish

func (m *InMemory) Publish(ctx context.Context, topic string, payload []byte) error

Publish sends a message to all subscribers of the topic. If no subscribers exist, the message is dropped (fire-and-forget). Each subscriber's handler is called in its own goroutine.

func (*InMemory) Subscribe

func (m *InMemory) Subscribe(ctx context.Context, topic string, handler func([]byte)) error

Subscribe registers a handler for the specified topic. The handler is called in a new goroutine for each message. The subscription remains active until ctx is canceled or Close is called.

type Postgres

type Postgres struct {
	// contains filtered or unexported fields
}

Postgres is a broker that uses PostgreSQL's LISTEN/NOTIFY for pub/sub. It's suitable for multi-process applications where events need to be shared across different instances or services connected to the same database.

Unlike InMemory, Postgres can distribute messages across multiple processes, but it still provides no durability - messages are lost if no subscribers are listening.

func NewPostgres

func NewPostgres(pool *pgxpool.Pool) *Postgres

NewPostgres creates a new Postgres broker using the provided connection pool. The pool must remain open for the lifetime of the broker.

func (*Postgres) Close

func (p *Postgres) Close() error

Close stops all listeners and releases connections.

func (*Postgres) Publish

func (p *Postgres) Publish(ctx context.Context, topic string, payload []byte) error

Publish sends a message to all subscribers of the topic across all processes. It uses PostgreSQL's NOTIFY command. The payload is sent as the notification payload.

func (*Postgres) Subscribe

func (p *Postgres) Subscribe(ctx context.Context, topic string, fn func([]byte)) error

Subscribe registers a handler for the specified topic. It creates a dedicated PostgreSQL connection with LISTEN for this topic if one doesn't already exist. Multiple handlers for the same topic share a single LISTEN connection.

type Publisher

type Publisher interface {
	// Publish sends a message to the specified topic.
	// The payload is delivered to all active subscribers.
	// Publishing is fire-and-forget - if no subscribers exist, the message is dropped.
	Publish(ctx context.Context, topic string, payload []byte) error

	// Close releases any resources held by the publisher.
	Close() error
}

Publisher publishes messages to topics.

type Subscriber

type Subscriber interface {
	// Subscribe registers a handler for the specified topic.
	// The handler is called asynchronously for each message published to the topic.
	// Multiple subscribers to the same topic each receive a copy of every message.
	//
	// The subscription remains active until the context is canceled or Close is called.
	// Handlers should be fast and non-blocking. For slow operations, handlers should
	// spawn goroutines or use channels to bridge to synchronous code.
	Subscribe(ctx context.Context, topic string, handler func([]byte)) error

	// Close releases any resources held by the subscriber and stops all handlers.
	Close() error
}

Subscriber subscribes to topics and receives messages via handlers.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL