package module
Version: v0.0.0-...-c63310e Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: Nov 9, 2019 License: Apache-2.0 Imports: 12 Imported by: 0


unifrost: A go module that makes it easier to stream pubsub events to the web

GoDoc Go Report Card FOSSA Status CII Best Practices

⚠ This project is on early stage, it's not ready for production yet ⚠

Previously named gochan

unifrost is a go module for relaying pubsub messages to the web via SSE(Eventsource). It is based on Twitter's implementation for real-time event-streamingin their new web app.

unifrost is named after bifrost, the rainbow bridge that connects Asgard with Midgard (Earth), that is MCU reference which is able to transport people both ways. But because unifrost sends messages from server to client (only one way), hence unifrost. 😎

It uses the Go CDK as vendor neutral pubsub driver that supports multiple pubusub vendors:

  • Google Cloud Pub/Sub
  • Amazon Simple Queueing Service (Pending)
  • Azure Service Bus (Pending)
  • RabbitMQ
  • NATS
  • Kafka
  • In-Memory (Only for testing)


unifrost supports Go modules and built against go version 1.13

go get github.com/rajveermalviya/unifrost


For documentation check godoc.


unifrost uses Server-Sent-Events, because of this it doesn't require to run a standalone server, unlike websockets it can be embedded in your api server. unifrost's streamer has a ServeHTTP method i.e it implements http.Handler interface so that it can be used directly or can be wrapped with middlewares like Authentication easily.

// Using streamer directly
streamer, err := unifrost.NewStreamer(
log.Fatal("HTTP server error: ", http.ListenAndServe("localhost:3000", streamer))
// Using streamer by wrapping it in auth middleware
streamer, err := unifrost.NewStreamer(

mux := http.NewServeMux()
mux.HandleFunc("/events", func (w http.ResponseWriter, r *http.Request) {
    err := Auth(w,r)
    if err != nil {
log.Fatal("HTTP server error: ", http.ListenAndServe("localhost:3000", mux))

When client connects to the server it will send a message that will contain two things the configuration and an array of all the topics to which the client has subscribed.

  1. Configuration: it contains the client-id and client-ttl set by the streamer config
  2. Subscriptions associated with the specified client id.

Example first message:

  "config": {
    "client_id": "9ba6f4e1-8f80-4e61-944e-e3f409ae514f",
    "client_ttl_millis": 60000
  "subscriptions": ["topic5", "topic1", "topic3", "topic4"]

Example error messaage:

  "error": {
    "topic": "topic10",
    "code": "subscription-failure",
    "message": "Cannot recieve message from subscription, closing subscription"

All the info events are streamed over message channel i.e using the EventSource JS API, onmessage or addEventListener('message', () => {}) method will listen to them. All the subscription events have event name same as their topic name, so to listen to topic events you need to add an event-listener on the EventSource object.

Client example:

const sse = new EventSource('/events?id=9ba6f4e1-8f80-4e61-944e-e3f409ae514f');
// for info events like first-message and errors
sse.addEventListener('message', e => {

// for subscription events
sse.addEventListener('topic10', e => {

Note: The only way to listen to subscription events is by adding an eventlistener to that specific topic. onmessage method will only listen to info messages.

New client is created explicitly using the streamer.NewClient() for client with auto generated id or streamer.NewCustomClient() for client with specified id.

This makes it easy to integrate authentication to the streamer, just create a new client when user connects to your application and return the unifrost streamer client_id (custom or autogenerated) with your API auth workflow. If you don't care about authentication, you can also generate a new client automatically everytime a new client connects without the id parameter use the following middleware with the streamer.

mux.HandleFunc("/events", func(w http.ResponseWriter, r *http.Request) {
    // Auto generate new clientID, when new client connects. (Not recommended)
    q := r.URL.Query()
    if q.Get("id") == "" {
        client, _ := streamer.NewClient(ctx)
        q.Set("id", client.ID)
        r.URL.RawQuery = q.Encode()

    streamer.ServeHTTP(w, r)

When a client gets disconnected it has a time window to connect to the server again with the state unchanged. If client ttl is not specified in the streamer config then default ttl is set to one.

To know more, check out the example

Why Server Sent Events (SSE) ?

Why would you choose SSE over WebSockets?

One reason SSEs have been kept in the shadow is because later APIs like WebSockets provide a richer protocol to perform bi-directional, full-duplex communication. However, in some scenarios data doesn't need to be sent from the client. You simply need updates from some server action. A few examples would be friends' status updates, stock tickers, news feeds, or other automated data push mechanisms (e.g. updating a client-side Web SQL Database or IndexedDB object store). If you'll need to send data to a server, Fetch API is always a friend.

SSEs are sent over traditional HTTP. That means they do not require a special protocol or server implementation to get working. WebSockets on the other hand, require full-duplex connections and new Web Socket servers to handle the protocol. In addition, Server-Sent Events have a variety of features that WebSockets lack by design such as automatic reconnection, event IDs, and the ability to send arbitrary events.

Because SSE works on top of HTTP, HTTP protocol improvements can also benefit SSE. For example, the in-development HTTP/3 protocol, built on top of QUIC, could offer additional performance improvements in the presence of packet loss due to lack of head-of-line blocking.


Join the #unifrost channel on gophers Slack Workspace for questions and discussions.

Future Goals:

  • Standalone server that can be configured by yaml, while also staying modular.
  • Making it horizontally scalabe using raft consensus algorithm.
  • Creating a website for documentation & overview, and some examples.
  • Become a CNCF project (...maybe).


If you are using unifrost in production please let me know by sending an email or file an issue.

Show some love

The best way to show some love towards the project, is to contribute and file issues.

If you love unifrost, you can support by sharing the project on Twitter.

You can also support by sponsoring the project via PayPal.


FOSSA Status



Package unifrost is a small package for relaying pubsub messages to the web via SSE(EventSource). It is loosely based on Twitter's implementation for real-time event-streaming in their new web app.

It uses GO CDK (gocloud.dev) for pubsub, so it supports various vendors ->

Google Cloud Pub/Sub
Amazon Simple Queueing Service (SQS) (Pending)
Azure Service Bus (Pending)
In-memory (Only for testing)



This section is empty.


View Source
var (
	// ErrNoClient is returned if the client-id is not registered in the streamer.
	ErrNoClient = errors.New("streamer: Client doesn't exists")


This section is empty.


type Client

type Client struct {
	ID string
	// contains filtered or unexported fields

Client is a top-level struct that manages all the topics.

func (*Client) Close

func (client *Client) Close(ctx context.Context) error

Close closes the client and shutdowns all the subscriptions.

func (*Client) Connected

func (client *Client) Connected() bool

Connected reports whether client is connected to the server.

func (*Client) GetTopics

func (client *Client) GetTopics(ctx context.Context) []string

GetTopics returns a slice of all the topics client is subscribed to.

func (*Client) TotalTopics

func (client *Client) TotalTopics(ctx context.Context) int

TotalTopics reports the number of topics the client is subscribed to.

type Option

type Option func(*Streamer) error

Option is a self-refrential function for configuration

func ClientTTL

func ClientTTL(t time.Duration) Option

ClientTTL is an option that is used to set the client's TTL default TTL is 1 minute

type Streamer

type Streamer struct {
	// contains filtered or unexported fields

Streamer is a top-level struct that will handle all the clients and subscriptions. It implements the http.Handler interface for easy working with the server.

func NewStreamer

func NewStreamer(subClient drivers.SubscriberClient, options ...Option) (*Streamer, error)

NewStreamer is the construtor for Streamer struct

func (*Streamer) Close

func (streamer *Streamer) Close(ctx context.Context) error

Close method closes the streamer and also closes all the connected clients.

func (*Streamer) GetClient

func (streamer *Streamer) GetClient(ctx context.Context, clientID string) *Client

GetClient method returns the client instance of the specified clientID.

func (*Streamer) NewClient

func (streamer *Streamer) NewClient(ctx context.Context) (*Client, error)

NewClient method creates a new client with an autogenerated client-id.

func (*Streamer) NewCustomClient

func (streamer *Streamer) NewCustomClient(ctx context.Context, clientID string) (*Client, error)

NewCustomClient method creates a new client with the specified clientID.

func (*Streamer) RemoveClient

func (streamer *Streamer) RemoveClient(ctx context.Context, clientID string) error

RemoveClient method removes the client from streamer and closes it.

func (*Streamer) ServeHTTP

func (streamer *Streamer) ServeHTTP(w http.ResponseWriter, r *http.Request)

ServeHTTP is the http handler for eventsource. For connecting query parameter 'id' is required i.e client-id.

func (*Streamer) Subscribe

func (streamer *Streamer) Subscribe(ctx context.Context, clientID string, topic string) error

Subscribe method subscribes the specified client to the specified topic. If specified client doesn't exists ErrNoClient error is returned.

func (*Streamer) TotalClients

func (streamer *Streamer) TotalClients(ctx context.Context) int

TotalClients method returns the number of client connected to the streamer.

func (*Streamer) Unsubscribe

func (streamer *Streamer) Unsubscribe(ctx context.Context, clientID string, topic string) error

Unsubscribe method unsubscribes the specified client to the specified topic and shutdowns the subscription. If specified client doesn't exists ErrNoClient error is returned.


Path Synopsis
Package drivers encapsulates all the drivers required under a single easy to use interface.
Package drivers encapsulates all the drivers required under a single easy to use interface.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL