pubsub

package module
v1.2.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 13, 2025 License: Apache-2.0 Imports: 5 Imported by: 2

README

Go PubSub

The go-pubsub package is a simple package for implementing publish-subscribe asynchronous tasks in Golang. It allows writing publishers and subscribers fully statically typed, and swap out Broker implementations (e.g. Memory, AWS SQS, etc.) as required.

This package imposes no restrictions on how messages should be represented, the idea is to keep subscribers agnostic to transport concerns and be fully typed using golang definitions. How messages are encoded/decoded in order to be transported over the network is up to the provider implementation in combination with Codec middlewares.

broker overview

What is a PubSub system

A PubSub system is a messaging system that has, as its name implies, two components: Publisher of messages and subscriber to messages. In contrast to synchronous communication, the publisher doesn't have to wait for a message to be received, as well as the receiver doesn't have to be online to retrieve messages sent earlier. As such, a PubSub system acts like a buffer for asynchronous messaging.

Features

  • Multi-topic support, the same subscriber may listen for messages on multiple topics at the same time.
  • Hybrid message filtering, subscriber are free to decide whether they want to receive messages for a concrete type or not (content-based), or just receive everything that is pushed to a given topic/s (topic-based).
  • Pluggable providers. Just implement the Broker interface. See below for a list of built-in providers

Providers

Providers are concrete implementations of the Broker interface. Examples of providers could be messaging services such as Google's PubSub, Amazon's SNS or Nats.io. The Broker interface acts as a generalization for such services.

The go-pubsub package comes with a set of built-in providers:

  • memory: A simple, synchronous Broker for in-process message communication, acting as a straightforward "Message Bus" or "Event Dispatcher" replacement. It ensures message processing in publication order and allows error feedback to the consumer if a subscriber fails to process a message. Notably, it executes all subscribers, requiring idempotent handling for retry functionality, as subscribers may be invoked multiple times until successful processing.
  • nop: a simple NO-OP broker implementation that can be used for testing.
  • redis: a broker that uses redis streams as PubSub mechanism.
  • snssqs: a Broker that uses AWS SNS and AWS SQS.
  • kmq: a KubeMQ implementation of the Broker interface.
Creating your own provider

This packages moves around the Broker interface definition, which is the central piece for dealing with PubSub systems. The Broker interface is a composition of three independent interfaces which can be used in order to keep you application concerns clean and separated:

type Broker interface {
	Publisher
	Subscriber
	Shutdowner
}

type Publisher interface {
	Publish(ctx context.Context, topic Topic, m interface{}) error
}

type Subscriber interface {
	Subscribe(ctx context.Context, topic Topic, handler Handler, option ...SubscribeOption) (Subscription, error)
}

type Shutdowner interface {
	Shutdown(ctx context.Context) error
}

Creating your own provider is as simple as implementing the Broker interface described above.

Middleware

A middleware acts as a wrapper for a Broker implementation. It can be used to intercept each message being published or being delivered to subscribers. Users can use middleware to do logging, metrics collection, and many other functionalities that can be shared across PubSub Providers.

To use middleware capabilities you must simply wrap your broker using any of the provided middlewares, example:

broker := printer.NewPrinterMiddleware(myProvider, os.Stdout)

Included middlewares are:

  • codec: a middleware that encodes and decodes messages using the given codec.
  • lifecycle: a middleware that allows to bind to broker lifecycle events.
  • printer: a simple middleware that prints each message to the given output.
  • recover: a middleware that recovers from panics.
  • retry: a middleware that retries publishing messages if the broker fails.

Middlewares can be combined by wrapping each other, for example:

broker := memory.NewMemoryBroker() 
broker = printer.NewPrinterMiddleware(myProvider, os.Stdout)
broker = codec.NewCodecMiddleware(broker, codec.JSON)
broker = recovery.NewRecoveryMiddleware(broker, func(ctx context.Context, p interface{}) error {
    println("panic:", p)
	
	return nil 
})

Please note that middlewares are applied in reverse order they are wrapped, so in the example above, the recovery middleware will be applied first, then the codec middleware, and so on:

Recovery -> Codec -> Printer -> Memory-Broker

TODO

  • Kafka provider
  • Google's Pub/Sub provider
  • Nats.io provider
  • Redis provider
  • Add protobuf support as a middleware codec
  • Recovery middleware for dealing with panics
  • Retry middleware for dealing with unreliable providers/handlers

Example

package main

import (
  "context"
  "fmt"
  "time"

  "github.com/botchris/go-pubsub"
  "github.com/botchris/go-pubsub/provider/memory"
)

var myTopic pubsub.Topic = "my-topic"

type myMessage struct {
  Body string
}

func main() {
  ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  defer cancel()

  broker := memory.NewBroker()

  // Define handlers 
  h1 := pubsub.NewHandler(func(ctx context.Context, t pubsub.Topic, m myMessage) error {
    fmt.Printf("%s -> %+v -> [s1]\n", t, m)

    return nil
  })

  h2 := pubsub.NewHandler(func(ctx context.Context, t pubsub.Topic, m *myMessage) error {
    fmt.Printf("%s -> %+v -> [s2]\n", t, m)

    return nil
  })

  h3 := pubsub.NewHandler(func(ctx context.Context, t pubsub.Topic, m string) error {
    fmt.Printf("%s -> %+v -> [s3]\n", t, m)

    return nil
  })

  // Subscribe to topic
  var s1 pubsub.Subscription
  {
    s1l, err := broker.Subscribe(ctx, myTopic, h1)
    if err != nil {
      panic(err)
    }

    s1 = s1l

    if _, sErr := broker.Subscribe(ctx, myTopic, h2); sErr != nil {
      panic(sErr)
    }

    if _, sErr := broker.Subscribe(ctx, myTopic, h3); sErr != nil {
      panic(sErr)
    }
  }

  // Publish test messages
  {
    if err := broker.Publish(ctx, myTopic, myMessage{Body: "value(hello world)"}); err != nil {
      panic(err)
    }

    if err := broker.Publish(ctx, myTopic, &myMessage{Body: "pointer(hello world)"}); err != nil {
      panic(err)
    }

    if err := broker.Publish(ctx, myTopic, "string(hello world)"); err != nil {
      panic(err)
    }

    // Unsubscribe S1
    if err := s1.Unsubscribe(); err != nil {
      panic(err)
    }

    // This will noop
    if err := broker.Publish(ctx, myTopic, myMessage{Body: "value(hello world)"}); err != nil {
      panic(err)
    }
  }

  // Output:
  //  {Body:value(hello world)} -> my-topic -> [s1]
  //  &{Body:pointer(hello world)} -> my-topic -> [s2]
  //  string(hello world) -> my-topic -> [s3]
  //  <nothing>
}

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrNilHandler                   = errors.New("handler function can not be nil")
	ErrHandlerNotAFunction          = errors.New("provided handler is not a function")
	ErrHandlerInputLengthMissMatch  = errors.New("handler function must have exactly three input arguments")
	ErrHandlerInputNoContext        = errors.New("first argument of handler must be a context")
	ErrHandlerInputNoTopic          = errors.New("second argument of handler must be a topic")
	ErrHandlerOutputLengthMissMatch = errors.New("handler must have exactly one output argument")
	ErrHandlerOutputNoError         = errors.New("handler output must implements `error` interface")
)

List of known errors for handler signature validation process.

Functions

This section is empty.

Types

type Broker

type Broker interface {
	Publisher
	Subscriber
	Shutdowner
}

Broker defines the interface for a pub/sub broker.

type Handler

type Handler interface {
	// Deliver delivers the message to the handler. If handler does not
	// accept this kind of message, it should NOT return an error.
	Deliver(ctx context.Context, topic Topic, message interface{}) error

	// Reflect returns a description of the message type the handler is
	// interested in.
	Reflect() MessageReflection
}

Handler represents a function capable of handling a message arriving at a topic.

func NewHandler

func NewHandler(handlerFunc interface{}) Handler

NewHandler builds a new handler instance for the given function.

This function WILL PANIC if the given function does not match the signature `func (ctx context.Context, t pubsub.Topic, m <Type>) error`, e.g.:

- func (ctx context.Context, t pubsub.Topic, pointer *MyCustomStruct) error - func (ctx context.Context, t pubsub.Topic, customS MyCustomStruct) error - func (ctx context.Context, t pubsub.Topic, customI MyCustomInterface) error

Handlers should return an error if they're unable to properly handle a given message. The same handler can be used on multiple subscriptions. In the other hand, in order to increase Broker's throughput, is highly recommended designing each Broker in such a way that handling of each message is asynchronously, in a separated goroutine.

type MessageReflection

type MessageReflection struct {
	// MessageType is the type of the message the handler is interested in, e.g. "myapp.MyMessage".
	MessageType reflect.Type

	// MessageKind is the kind of the type of the message, e.g. "struct", "interface", "ptr", etc.
	MessageKind reflect.Kind

	// Handler is the reflected handler function itself.
	Handler reflect.Value
}

MessageReflection describes the message a handler is interested in.

func (MessageReflection) Accepts

func (r MessageReflection) Accepts(m interface{}) bool

Accepts whether the handler accepts the provided message.

func (MessageReflection) String added in v1.2.1

func (r MessageReflection) String() string

String returns a string representation of the reflected message.

type Metadata

type Metadata interface {
	// Get returns the value associated with this metadata for key, or nil if no
	// value is associated with key. Successive calls to Value with the same key
	// returns the same result.
	Get(key interface{}) interface{}

	// Set stores the given value under the specified key, overwrites if
	// already exists.
	Set(key, val interface{}) Metadata
}

Metadata is the interface that wraps the basic Get and Set methods.

func NewMetadata

func NewMetadata() Metadata

NewMetadata returns a new thread-safe Metadata instance which can be safely used by multiple goroutines.

type Publisher added in v1.2.0

type Publisher interface {
	// Publish the given message on the given topic.
	Publish(ctx context.Context, topic Topic, m interface{}) error
}

Publisher is a convenience definition which extract topic-publishing behavior into an independent interface.

This is specially useful when needing to publish messages without having to expose the entire Broker implementation.

type Shutdowner added in v1.2.0

type Shutdowner interface {
	// Shutdown gracefully shutdowns all subscriptions.
	//
	// The provided context acts as a hard-limit cancellation for the shutdown
	// process.
	Shutdown(ctx context.Context) error
}

Shutdowner provides a method that can manually trigger the shutdown of the Broker by gracefully closing each subscription.

Use this interface when you need to manually shutdown the Broker. Use with care, as it will prevent the broker from publishing or receiving any messages.

type StoppableSubscription

type StoppableSubscription interface {
	Subscription

	// Context returns the internal context of this subscription which controls
	// its life cycle. This is usually branched from broker's internal context,
	// and allows implementing graceful shutdown mechanisms when broker decides
	// to stop.
	Context() context.Context

	// Stop is used to signal the subscription to stop. This is usually invoked
	// by the broker during graceful shutdown or during unsubscription
	// operations.
	Stop()
}

StoppableSubscription represents a subscription that can be stopped.

Generally used by brokers implementations that relies on background running goroutines for handling subscriptions and message receptions from remote backends, e.g. Kafka, NATS, etc.

func NewStoppableSubscription

func NewStoppableSubscription(
	ctx context.Context,
	id string,
	topic Topic,
	handler Handler,
	unsubscriber UnsubscribeFunc,
	options SubscribeOptions,
) (StoppableSubscription, error)

NewStoppableSubscription builds a new stoppable subscription. Given context should be the broker's internal context, this allows to implement graceful shutdown.

type SubscribeOption

type SubscribeOption func(*SubscribeOptions)

SubscribeOption is a function that configures a SubscribeOptions instance.

func DisableAutoAck

func DisableAutoAck() SubscribeOption

DisableAutoAck will disable auto ACKing of messages after they have been handled.

func SubscribeMetadata

func SubscribeMetadata(meta Metadata) SubscribeOption

SubscribeMetadata set subscription metadata.

func WithGroup

func WithGroup(name string) SubscribeOption

WithGroup sets the name of the group to share messages on.

type SubscribeOptions

type SubscribeOptions struct {
	// Group subscriptions with the same group name will create a shared
	// subscription where each one receives a subset of the published messages.
	Group string

	// AutoAck defaults to true. When a handler returns with a nil error the
	// message is acknowledged.
	AutoAck bool

	// Metadata other options for concrete implementations of the Broker
	// interface can be stored as metadata.
	Metadata Metadata
}

SubscribeOptions describes the options for a subscription action.

func NewSubscribeOptions

func NewSubscribeOptions(opts ...SubscribeOption) SubscribeOptions

NewSubscribeOptions convenience function for building a SubscribeOptions based on given list of options.

type Subscriber added in v1.2.0

type Subscriber interface {
	// Subscribe attaches the given handler to the given topic.
	//
	// The same Handler can be reused and attached multiple times to the same
	// or distinct topics.
	Subscribe(ctx context.Context, topic Topic, handler Handler, option ...SubscribeOption) (Subscription, error)
}

Subscriber is a convenience definition which extract topic-subscription behavior into an independent interface.

This is specially useful when needing to subscribe to topics without having to expose the entire Broker implementation.

type Subscription

type Subscription interface {
	// ID uniquely identifies the subscription. UUID algorithm, or similar, is
	// recommended.
	ID() string

	// Options returns the options used to create the subscription.
	Options() SubscribeOptions

	// Topic returns the topic the subscription is subscribed to.
	Topic() Topic

	// Unsubscribe unsubscribes.
	Unsubscribe() error

	// Handler returns the underlying Handler function this subscription will
	// use when receiving messages.
	Handler() Handler
}

Subscription represents a Handler subscribed to a topic.

func NewSubscription

func NewSubscription(
	id string,
	topic Topic,
	handler Handler,
	unsub UnsubscribeFunc,
	options SubscribeOptions,
) Subscription

NewSubscription convenience function for creating a new subscriptions.

type Topic

type Topic string

Topic identifies a particular Topic on which messages can be published.

func (Topic) String

func (t Topic) String() string

String returns the string representation of the Topic.

type UnsubscribeFunc

type UnsubscribeFunc func() error

UnsubscribeFunc represents a function responsible for unsubscribing a subscription.

Directories

Path Synopsis
example
simple command
middleware
retry
Package retry provides retry strategies for retry middleware.
Package retry provides retry strategies for retry middleware.
provider
kmq
memory
Package memory provides a simple in-memory Broker, which moves messages using local memory.
Package memory provides a simple in-memory Broker, which moves messages using local memory.
nop
snssqs
Package snssqs provides a simple AWS SNS+SQS broker implementation.
Package snssqs provides a simple AWS SNS+SQS broker implementation.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL