pubsub

package module
v0.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 17, 2021 License: Apache-2.0 Imports: 9 Imported by: 0

README

Go Report Card Go Reference

pubsub

Design

This is an HTTP websocket publish-subscribe server written in Golang with publisher and subscriber client packages. Each different URL path is its own subscription topic.

All assets for this project are stored in memory. This means the pubsub server scales vertically with its sole host's processing power, memory, and network.

The primary use case for this pubsub server is smaller projects where it makes sense to separate publisher and subscriber logic into different programs or hosts/containers.

This pubsub server is not recommended for larger projects that need to scale a pubsub server past the resources available on a single host.

The websocket package used is github.com/gorilla/websocket.

Deploying the server

For Dockerized deployments the pre-built image is available at on DockerHub using this path: micahparks/websocket-pubsub. The Dockerfile is also included in the root directory of this project.

Environment variables:

Name Description
PUBSUB_ADDR The addr argument to pass to http.ListenAndServe.

Client usage

Clients exclusively communicate through HTTP websocket binary messages. This means any encoding is allowed: JSON, protobuf, gob, etc. The Go datatype used is []byte.

The examples below do not cover some more advanced use cases. Using the clients.Options data structure, a client name can be specified, as well as custom websocket dialer or initial headers to the dialing request. Each client is assigned a UUID on the pubsub server side to uniquely identify clients in the logs.

Full publisher example:

package main

import (
	"context"
	"log"

	"github.com/MicahParks/websocket-pubsub/env"
	"github.com/MicahParks/websocket-pubsub/pubclient"
)

func main() {

	// Get the pubsub websocket URL from an environment variable.
	pubsubURL, err := env.URL()
	if err != nil {
		log.Fatalf("Failed to get pubsub URL.\nError: %s", err.Error())
	}

	// Create the publisher.
	var pub *pubclient.Publisher
	if pub, _, err = pubclient.New(context.Background(), pubsubURL); err != nil {
		log.Fatalf("Failed to create publisher.\nError: %s", err.Error())
	}

	// Publish a message.
	if err = pub.Publish([]byte("message")); err != nil {
		log.Fatalf("Failed to publish message.\nError: %s", err.Error())
	}

	// Close the publisher.
	if err = pub.Close(); err != nil {
		log.Fatalf("Failed to close the publisher.\nError: %s", err.Error())
	}

	// Wait for the publisher to close completely.
	<-pub.Done()

	// Print an asynchronous error for the publisher, if any.
	if err = pub.Error(); err != nil {
		log.Printf("This error occurred asynchronously in the publisher: %s", err.Error())
	}
}

Full subscriber example:

package main

import (
	"context"
	"log"
	"os"
	"os/signal"
	"syscall"

	"github.com/MicahParks/websocket-pubsub/env"
	"github.com/MicahParks/websocket-pubsub/subclient"
)

func main() {

	// Create a logger.
	logger := log.New(os.Stdout, "", 0)

	// Get the pubsub server address.
	u, err := env.URL()
	if err != nil {
		logger.Fatalf("Failed to connect to pubsub server.\nError: %s", err.Error())
	}
	logger.Printf("Connecting to: \"%s\".", u.String())

	// Create a context for the publisher.
	ctx, cancel := context.WithCancel(context.Background())

	// Create the subscriber client.
	var sub *subclient.Subscriber
	if sub, _, err = subclient.New(ctx, nil, u); err != nil {
		logger.Fatalf("Failed to create subscriber.\nError: %s", err.Error())
	}

	// Make a channel to catch SIGTERM (Ctrl + C) from the OS.
	ctrlC := make(chan os.Signal, 10)

	// Tell the program to monitor for an interrupt or SIGTERM and report it on the given channel.
	signal.Notify(ctrlC, os.Interrupt, syscall.SIGTERM)

	// Print messages as they come in.
loop:
	for {
		select {
		case message := <-sub.Messages():
			logger.Println(string(message))
		case <-ctrlC:
			cancel()
			break loop
		}
	}

	// Wait for the subscriber to finish.
	<-sub.Done()
}

Testing

Generating the test data

In order to test, the test data must be generated.

go run cmd/generate_test_data/main.go

This will create a file called test.dat in the project's root directory, which is read by the test.

The test data are constructed from a random number of bytes up to the maxMessageSize flag. That number of bytes is the quantity of random bytes read from a seeded math/rand. Those random bytes are interpreted as an unsigned integer, *big/Int, then gob encoded.

Note:

  • The test data's message quantity and maximum message size is configurable through flags.
Performing a test

After the test data, test.dat, has been generated, a full functional test can be performed.

go test -cover -race

The test will read in the test data, sort it, hash it, then spin up publishers and subscribers.

The publishers will get a reference to the test data, and send it message-by-message, until each publisher has sent every message.

The subscribers will read a copy of every message published, sort all the messages, hash them, then confirm the hash with what's expected.

The amount of memory this test consumes scales greatly with:

  • The number of messages.
  • The maximum size of messages.
  • The number of publishers.
  • The number of subscribers.

Note:

  • This test pretty much keeps everything in memory until the test is over. This includes a bunch of copies of the test data. Be careful not to run out of memory. The defaults should be safe for most machines.
  • The number of publishers and subscribers is configurable through flags.
Test coverage

This one test covers 76.4% of the code. Additional tests are welcome in contributions.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func WebSocketHandler

func WebSocketHandler(ctx context.Context, options ...Options) http.HandlerFunc

WebSocketHandler creates an HTTP handler via a closure that will upgrade the connection to a websocket and use that websocket as either a publisher or a subscriber.

Types

type Event added in v0.3.0

type Event struct {

	// ClientID is the unique client ID for a client to the subscription that the event is for.
	ClientID string `json:"clientID"`

	// SubscriptionTopic is the topic of the subscription that generated this event. (The URL path.)
	SubscriptionTopic string `json:"subscriptionTopic"`

	// Type is an enum that indicates what kind of event has happened.
	Type EventType `json:"type"`
}

Event represents an event for a subscription.

type EventType added in v0.3.0

type EventType uint

EventType is an enum that indicates what kind of event has happened.

const (

	// EventTypePublisherLeft is an event that indicates a publisher has left the subscription.
	EventTypePublisherLeft EventType = 1

	// EventTypePublisherJoined is an event that indicates a publisher has joined the subscription.
	EventTypePublisherJoined EventType = 2

	// EventTypeSubscriberLeft is an event that indicates a subscriber has left the subscription.
	EventTypeSubscriberLeft EventType = 3

	// EventTypeSubscriberJoined is an event that indicates a subscriber has joined the subscription.
	EventTypeSubscriberJoined EventType = 4

	// EventTypeWebsocketUpgradeFailed is an event that indicates a websocket upgrade failed.
	EventTypeWebsocketUpgradeFailed EventType = 5

	// EventTypeBadHTTPHeaders is an event that indicates a incoming request didn't have appropriate HTTP headers.
	EventTypeBadHTTPHeaders EventType = 6
)

type Options added in v0.3.0

type Options struct {

	// Options are the options for all subscriptions created by the service.
	Options *SubscriptionOptions

	// Upgrader is the websocket upgrader to when clients connect.
	Upgrader *websocket.Upgrader
}

Options represents all of the websocket pubsub information that already has default values.

type SubscriptionOptions added in v0.3.0

type SubscriptionOptions struct {

	// CloseDeadline is the time to wait to gracefully closing something. After that, it's closed regardless.
	CloseDeadline *time.Duration

	// Events is a channel of events for the subscription.
	Events chan<- Event

	// MessageBuffer is the internal channel buffer for messages for the subscription.
	MessageBuffer uint

	// PongDeadline is the time to wait for a pong message after a ping.
	PongDeadline *time.Duration

	// PublisherBuffer is the internal channel buffer for adding and removing publishers.
	PublisherBuffer uint

	// SubscriberBuffer is the internal channel buffer for adding and remove subscribers.
	SubscriberBuffer uint

	// SubscriberWriteTimeout is the time to wait for a message to be written to a subscriber. If the message is not
	// written in this amount of time, the subscriber is closed.
	SubscriberWriteTimeout *time.Duration
}

SubscriptionOptions represents information used to create a subscription that already has default values.

Directories

Path Synopsis
cmd
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL