package module
Version: v0.1.20 Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: Apr 18, 2022 License: MIT Imports: 25 Imported by: 1



Build Status Go Reference

A real-time message bus server and library written in Go.


  • Simple HTTP API
  • Simple command-line client
  • In memory queues
  • WebSockets for real-time messages
  • Pull and Push model


$ go install

Use Cases

  • As a simple generic webhook

You can use msgbus as a simple generic webhook. For example in my dockerfiles repo I have hooked up Prometheus's AlertManager to send alert notifications to an IRC channel using some simple shell scripts.

See: alert

  • As a general-purpose message / event bus that supports pub/sub as well as pulling messages synchronously.

Usage (library)

Install the package into your project:

$ go get

Use the MessageBus type either directly:

package main

import (


func main() {
    m := msgbus.New()
    m.Put("foo", m.NewMessage([]byte("Hello World!")))

    msg, ok := m.Get("foo")
    if !ok {
        log.Printf("No more messages in queue: foo")
    } else {
	    "Received message: id=%s topic=%s payload=%s",
	    msg.ID, msg.Topic, msg.Payload,

Running this example should yield something like this:

$ go run examples/hello.go
2017/08/09 03:01:54 [msgbus] PUT id=0 topic=foo payload=Hello World!
2017/08/09 03:01:54 [msgbus] NotifyAll id=0 topic=foo payload=Hello World!
2017/08/09 03:01:54 [msgbus] GET topic=foo
2017/08/09 03:01:54 Received message: id=%!s(uint64=0) topic=foo payload=Hello World!

See the godoc for further documentation and other examples.

Usage (tool)

Run the message bus daemon/server:

$ msgbusd
2017/08/07 01:11:16 [msgbus] Subscribe id=[::1]:55341 topic=foo
2017/08/07 01:11:22 [msgbus] PUT id=0 topic=foo payload=hi
2017/08/07 01:11:22 [msgbus] NotifyAll id=0 topic=foo payload=hi
2017/08/07 01:11:26 [msgbus] PUT id=1 topic=foo payload=bye
2017/08/07 01:11:26 [msgbus] NotifyAll id=1 topic=foo payload=bye
2017/08/07 01:11:33 [msgbus] GET topic=foo
2017/08/07 01:11:33 [msgbus] GET topic=foo
2017/08/07 01:11:33 [msgbus] GET topic=foo

Subscribe to a topic using the message bus client:

$ msgbus sub foo
2017/08/07 01:11:22 [msgbus] received message: id=0 topic=foo payload=hi
2017/08/07 01:11:26 [msgbus] received message: id=1 topic=foo payload=bye

Send a few messages with the message bus client:

$ msgbus pub foo hi
$ msgbus pub foo bye

You can also manually pull messages using the client:

$ msgbus pull foo
2017/08/07 01:11:33 [msgbus] received message: id=0 topic=foo payload=hi
2017/08/07 01:11:33 [msgbus] received message: id=1 topic=foo payload=bye

This is slightly different from a listening subscriber (using websockets) where messages are pulled directly.

Usage (HTTP)

Run the message bus daemon/server:

$ msgbusd
2018/03/25 13:21:18 msgbusd listening on :8000

Send a message with using curl:

$ curl -q -o - -X PUT -d '{"message": "hello"}' http://localhost:8000/hello

Pull the messages off the "hello" queue using curl:

$ curl -q -o - http://localhost:8000/hello

Decode the payload:

$ echo 'eyJtZXNzYWdlIjogImhlbGxvIn0=' | base64 -d
{"message": "hello"}



List all known topics/queues.


$ curl -q -o - http://localhost:8000/ | jq '.'
  "hello": {
    "name": "hello",
    "ttl": 60000000000,
    "seq": 1,
    "created": "2018-05-07T23:44:25.681392205-07:00"

POST|PUT /topic

Post a new message to the queue named by <topic>.

NB: Either POST or PUT methods can be used here.


$ curl -q -o - -X PUT -d '{"message": "hello"}' http://localhost:8000/hello
message successfully published to hello with sequence 1

GET /topic

Get the next message of the queue named by <topic>.

  • If the topic is not found. Returns: 404 Not Found
  • If the Websockets Upgrade header is found, upgrades to a websocket channel and subscribes to the topic <topic>. Each new message published to the topic <topic> are instantly published to all subscribers.


$ curl -q -o - http://localhost:8000/hello

DELETE /topic

Deletes a queue named by <topic>.

Not implemented.

  • je -- A distributed job execution engine for the execution of batch jobs, workflows, remediations and more.


msgbus is licensed under the MIT License




View Source
const (
	// DefaultBind is the default bind address
	DefaultBind = ":8000"

	// DefaultLogPath is the default path to write logs to (wal)
	DefaultLogPath = "./logs"

	// DefaultMaxQueueSize is the default maximum size of queues
	DefaultMaxQueueSize = 1024 // ~8MB per queue (1000 * 4KB)

	// DefaultMaxPayloadSize is the default maximum payload size
	DefaultMaxPayloadSize = 8192 // 8KB

	// DefaultBufferLength is the default buffer length for subscriber chans
	DefaultBufferLength = 256

	// DefaultMetrics is the default for whether to enable metrics
	DefaultMetrics = false

	// DefaultNoSync is the default for whether to disable faync after writing
	// messages to the write-ahead-log (wal) files. The default is `false` which
	// is safer and will prevent corruption in event of crahses or power failure,
	// but is slower.
	DefaultNoSync = false


View Source
var (
	// Version release version
	Version = "0.1.1"

	// Commit will be overwritten automatically by the build system
	Commit = "HEAD"
View Source
var DefObjectives = map[float64]float64{
	0.50: 0.05,
	0.90: 0.01,
	0.95: 0.005,
	0.99: 0.001,

DefObjectives ...

View Source
var (
	// BufferFull is logged in Subscribe() when a subscriber's
	// buffer is full and messages can no longer be enqueued for delivery
	ErrBufferFull = errors.New("error: subscriber buffer full")


func FullVersion

func FullVersion() string

FullVersion returns the full version, build and commit hash

func GenerateULID added in v0.1.16

func GenerateULID() (string, error)

GenerateULID generates a new unique identifer

func MustGenerateULID added in v0.1.16

func MustGenerateULID() string

MustGenerateULID generates a new unique identifer or fails

func SafeParseInt64 added in v0.1.15

func SafeParseInt64(s string, d int64) int64

SafeParseInt64 ...


type Client

type Client struct {
	// contains filtered or unexported fields

Client ...

func NewClient

func NewClient(conn *websocket.Conn, topic *Topic, index int64, bus *MessageBus) *Client

NewClient ...

func (*Client) Start

func (c *Client) Start()

Start ...

type HandlerFunc

type HandlerFunc func(msg *Message) error

HandlerFunc ...

type Message

type Message struct {
	ID      int64     `json:"id"`
	Topic   *Topic    `json:"topic"`
	Payload []byte    `json:"payload"`
	Created time.Time `json:"created"`

Message ...

func LoadMessage added in v0.1.15

func LoadMessage(data []byte) (m Message, err error)

func (Message) Bytes added in v0.1.15

func (m Message) Bytes() ([]byte, error)

type MessageBus

type MessageBus struct {
	// contains filtered or unexported fields

MessageBus ...

func NewMessageBus

func NewMessageBus(opts ...Option) (*MessageBus, error)

NewMessageBus creates a new message bus with the provided options

func (*MessageBus) Get

func (mb *MessageBus) Get(t *Topic) (Message, bool)

Get ...

func (*MessageBus) Len

func (mb *MessageBus) Len() int

Len ...

func (*MessageBus) Metrics

func (mb *MessageBus) Metrics() *Metrics

Metrics ...

func (*MessageBus) NewMessage

func (mb *MessageBus) NewMessage(topic *Topic, payload []byte) Message

NewMessage ...

func (*MessageBus) NewTopic

func (mb *MessageBus) NewTopic(topic string) *Topic

NewTopic ...

func (*MessageBus) Put

func (mb *MessageBus) Put(message Message) error

Put ...

func (*MessageBus) ServeHTTP

func (mb *MessageBus) ServeHTTP(w http.ResponseWriter, r *http.Request)

func (*MessageBus) Subscribe

func (mb *MessageBus) Subscribe(id, topic string, opts ...SubscribeOption) chan Message

Subscribe ...

func (*MessageBus) Unsubscribe

func (mb *MessageBus) Unsubscribe(id, topic string)

Unsubscribe ...

type Metrics

type Metrics struct {
	// contains filtered or unexported fields

Metrics ...

func NewMetrics

func NewMetrics(namespace string) *Metrics

NewMetrics ...

func (*Metrics) Counter

func (m *Metrics) Counter(subsystem, name string) prometheus.Counter

Counter ...

func (*Metrics) CounterVec added in v0.1.2

func (m *Metrics) CounterVec(subsystem, name string) *prometheus.CounterVec

CounterVec ...

func (*Metrics) Gauge

func (m *Metrics) Gauge(subsystem, name string) prometheus.Gauge

Gauge ...

func (*Metrics) GaugeVec

func (m *Metrics) GaugeVec(subsystem, name string) *prometheus.GaugeVec

GaugeVec ...

func (*Metrics) Handler

func (m *Metrics) Handler() http.Handler

Handler ...

func (*Metrics) NewCounter

func (m *Metrics) NewCounter(subsystem, name, help string) prometheus.Counter

NewCounter ...

func (*Metrics) NewCounterFunc

func (m *Metrics) NewCounterFunc(subsystem, name, help string, f func() float64) prometheus.CounterFunc

NewCounterFunc ...

func (*Metrics) NewCounterVec added in v0.1.2

func (m *Metrics) NewCounterVec(subsystem, name, help string, labels []string) *prometheus.CounterVec

NewCounterVec ...

func (*Metrics) NewGauge

func (m *Metrics) NewGauge(subsystem, name, help string) prometheus.Gauge

NewGauge ...

func (*Metrics) NewGaugeFunc

func (m *Metrics) NewGaugeFunc(subsystem, name, help string, f func() float64) prometheus.GaugeFunc

NewGaugeFunc ...

func (*Metrics) NewGaugeVec

func (m *Metrics) NewGaugeVec(subsystem, name, help string, labels []string) *prometheus.GaugeVec

NewGaugeVec ...

func (*Metrics) NewSummary

func (m *Metrics) NewSummary(subsystem, name, help string) prometheus.Summary

NewSummary ...

func (*Metrics) NewSummaryVec

func (m *Metrics) NewSummaryVec(subsystem, name, help string, labels []string) *prometheus.SummaryVec

NewSummaryVec ...

func (*Metrics) Run

func (m *Metrics) Run(addr string)

Run ...

func (*Metrics) Summary

func (m *Metrics) Summary(subsystem, name string) prometheus.Summary

Summary ...

func (*Metrics) SummaryVec

func (m *Metrics) SummaryVec(subsystem, name string) *prometheus.SummaryVec

SummaryVec ...

type Option added in v0.1.15

type Option func(opts *Options) error

func WithBufferLength added in v0.1.15

func WithBufferLength(bufferLength int) Option

func WithLogPath added in v0.1.15

func WithLogPath(logPath string) Option

func WithMaxPayloadSize added in v0.1.15

func WithMaxPayloadSize(maxPayloadSize int) Option

func WithMaxQueueSize added in v0.1.15

func WithMaxQueueSize(maxQueueSize int) Option

func WithMetrics added in v0.1.15

func WithMetrics(metrics bool) Option

func WithNoSync added in v0.1.15

func WithNoSync(noSync bool) Option

type Options

type Options struct {
	LogPath string

	BufferLength   int
	MaxQueueSize   int
	MaxPayloadSize int

	Metrics bool
	NoSync  bool

Options ...

func NewDefaultOptions added in v0.1.15

func NewDefaultOptions() *Options

type Queue

type Queue struct {
	// contains filtered or unexported fields

Queue represents a single instance of a bounded queue data structure with access to both side. If maxlen is non-zero the queue is bounded otherwise unbounded.

func NewQueue added in v0.1.2

func NewQueue(maxlen *Queue

NewQueue creates a new instance of Queue with the provided maxlen

func (*Queue) Empty added in v0.1.2

func (q *Queue) Empty() bool

Empty returns true if the queue is empty false otherwise

func (*Queue) ForEach added in v0.1.13

func (q *Queue) ForEach(f func(elem interface{}) error) error

ForEach applys the function `f` over each item in the queue for read-only access into the queue in O(n) time for indexining into the queue.

func (*Queue) Full added in v0.1.2

func (q *Queue) Full() bool

Full returns true if the queue is full false otherwise

func (*Queue) Len

func (q *Queue) Len() int

Len returns the number of elements currently stored in the queue.

func (*Queue) MaxLen added in v0.1.2

func (q *Queue) MaxLen() int

MaxLen returns the maxlen of the queue

func (*Queue) Peek

func (q *Queue) Peek() interface{}

Peek returns the element at the front of the queue.

func (*Queue) Pop

func (q *Queue) Pop() interface{}

Pop removes and returns the element from the front of the queue.

func (*Queue) Push

func (q *Queue) Push(elem interface{})

Push appends an element to the back of the queue.

func (*Queue) Size added in v0.1.2

func (q *Queue) Size() int

Size returns the current size of the queue

type SubscribeOption added in v0.1.13

type SubscribeOption func(*SubscriberOptions)

SubscribeOption ...

func WithIndex added in v0.1.13

func WithIndex(index int64) SubscribeOption

WithIndex sets the index to start subscribing from

type SubscriberConfig added in v0.1.13

type SubscriberConfig struct {
	BufferLength int

SubscribersConfig ...

type SubscriberOptions added in v0.1.13

type SubscriberOptions struct {
	Index int64

SubscriberOptions ...

type Subscribers added in v0.1.13

type Subscribers struct {
	// contains filtered or unexported fields

Subscribers ...

func NewSubscribers added in v0.1.13

func NewSubscribers(config *SubscriberConfig) *Subscribers

NewSubscribers ...

func (*Subscribers) AddSubscriber added in v0.1.13

func (subs *Subscribers) AddSubscriber(id string) chan Message

AddSubscriber ...

func (*Subscribers) GetSubscriber added in v0.1.13

func (subs *Subscribers) GetSubscriber(id string) (chan Message, bool)

GetSubscriber ...

func (*Subscribers) HasSubscriber added in v0.1.13

func (subs *Subscribers) HasSubscriber(id string) bool

HasSubscriber ...

func (*Subscribers) Len added in v0.1.13

func (subs *Subscribers) Len() int

Len ...

func (*Subscribers) NotifyAll added in v0.1.13

func (subs *Subscribers) NotifyAll(message Message) int

NotifyAll ...

func (*Subscribers) RemoveSubscriber added in v0.1.13

func (subs *Subscribers) RemoveSubscriber(id string)

RemoveSubscriber ...

type Topic

type Topic struct {
	Name     string    `json:"name"`
	Sequence int64     `json:"seq"`
	Created  time.Time `json:"created"`

Topic ...

func (*Topic) String added in v0.1.2

func (t *Topic) String() string


Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL