pubsub

package module
v0.1.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 14, 2021 License: CC0-1.0 Imports: 1 Imported by: 1

README

pubsub

Embeddable, Lightweight, Sharded In-App Messaging in Go

Motivation

After using few Pub/Sub systems in writing production grade softwares, I decided to write one very simple, embeddable, light-weight, sharded in-app messaging system using only native Go functionalities.

Actually, Go channels are MPSC i.e. multiple producers can push onto same channel, but there's only one consumer. You're very much free to use multiple consumers on single channel, but they will start competing for messages being published on channel i.e. all consumers won't see all messages published on channel. MPMC i.e. where multiple producers pushes messages to multiple consumers are also not directly implementable using channels.

Good thing is that Go channels are concurrent-safe. So I considered extending it to make in-application communication more flexible, while contending as less as possible by introducing consumer shards & consumer message inboxes i.e. resizeable buffer.

Following communication patterns can be used with higher level abstractions.

✌️ Producer Consumer
Single
Multiple

Design

architecture

Stress Testing

For stress testing the system, I wrote one configurable program which makes running tests easy using various CLI argument combination.

Run it using with

go run stress/main.go -help
go run stress/main.go

stress_testing_result

Usage

First create a Go project with GOMOD support.

# Assuming you're on UNIX flavoured OS

cd
mkdir test_pubsub

cd test_pubsub
go mod init github.com/<username>/test_pubsub

touch main.go

Now add github.com/itzmeanjan/pubsub as your project dependency

go get github.com/itzmeanjan/pubsub # v0.1.7 latest

And follow full example.


Important

If you're planning to use pubsub in your application

  • You should first start pub/sub broker using
// 2 consumer shards
broker := pubsub.New(2)

// Start using broker 👇
  • If you're a publisher, you should concern yourself with only PubSub.Publish(...)
msg := pubsub.Message{
    Topics: []pubsub.String{pubsub.String("topic_1")},
    Data:   []byte("hello"),
}
consumerCount := broker.Publish(&msg) // concurrent-safe
  • If you're a subscriber, you should first subscribe to N-many topics, using PubSub.Subscribe(...).
subscriber := broker.Subscribe(16, []string{"topic_1"}...)
  • You can start consuming messages using Subscriber.Next()
for {
    msg := subscriber.Next()
    if msg == nil {
        continue
    }
}
  • Or you can listen to notification sent over channel & act
for {
    select {
        case <-ctx.Done():
            // graceful shutdown
            return
        
        case <-subscriber.Listener():
            msg := subscriber.Next()
            // pulled messsage, act now
    }
}
  • Check if any consumable message living in inbox
if !subscriber.Consumable() {
    // nothing to consume
    return
}

// consume them by pulling : `subscriber.Next()`
  • Add more subscriptions on-the-fly using Subscriber.AddSubscription(...)
topicCount := subscriber.AddSubscription([]string{"topic_2"}...)
  • Unsubscribe from specific topic using Subscriber.Unsubscribe(...)
topicCount := subscriber.Unsubscribe([]string{"topic_1"}...)
  • Unsubscribe from all topics using Subscriber.UnsubscribeAll()
topicCount := subscriber.UnsubscribeAll()
  • Destroy subscriber by Subscriber.Destroy(), which removes this subscriber's message inbox from broker's consumer shard - No future message to be received.

And all set 🚀


You can check package documentation here

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Message

type Message struct {
	Topics []string
	Data   []byte
}

Message - Publisher showing intent of publishing arbitrary byte slice to topics

type PubSub

type PubSub struct {
	// contains filtered or unexported fields
}

PubSub - Pub/Sub Server i.e. holds which clients are subscribed to what topics, manages publishing messages to correct topics, handles (un-)subscription requests

In other words state manager of Pub/Sub Broker

func New

func New(shardCount uint64) *PubSub

New - Create a new Pub/Sub hub, using which messages can be routed to various topics

func (*PubSub) Publish

func (p *PubSub) Publish(msg *Message) uint64

Publish - Publish message to N-topics in concurrent-safe manner

func (*PubSub) Subscribe

func (p *PubSub) Subscribe(cap int, topics ...string) *Subscriber

Subscribe - Create new subscriber instance with initial capacity, listening for messages published on N-topics initially.

More topics can be subscribed to later using returned subscriber instance.

type PublishedMessage

type PublishedMessage struct {
	Topic string
	Data  []byte
}

PublishedMessage - Subscriber will receive message for consumption in this form

type Subscriber

type Subscriber struct {
	// contains filtered or unexported fields
}

Subscriber - Uniquely identifiable subscriber with multiple subscribed topics from where it wishes to listen from over ping channel

func (*Subscriber) AddSubscription

func (s *Subscriber) AddSubscription(topics ...string) uint64

AddSubscription - Add subscriptions to more topics on-the-fly

func (*Subscriber) Consumable added in v0.1.6

func (s *Subscriber) Consumable() bool

Consumable - Checks whether any consumable messages in buffer or not [ concurrent-safe ]

func (*Subscriber) Destroy added in v0.1.6

func (s *Subscriber) Destroy()

Destroy - Asks hub to remove communication channel to this subscriber

func (*Subscriber) Listener added in v0.1.6

func (s *Subscriber) Listener() chan struct{}

Listener - Get notified when new message is received

func (*Subscriber) Next

func (s *Subscriber) Next() *PublishedMessage

Next - Attempt to consume oldest message living in buffer, by popping it out, in concurrent-safe manner

If nothing exists, it'll return nil

func (*Subscriber) Unsubscribe

func (s *Subscriber) Unsubscribe(topics ...string) uint64

Unsubscribe - Unsubscribe from specified subscribed topics

func (*Subscriber) UnsubscribeAll

func (s *Subscriber) UnsubscribeAll() uint64

UnsubscribeAll - Unsubscribe from all active subscribed topics

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL