nq

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 22, 2020 License: BSD-3-Clause Imports: 14 Imported by: 0

README

nanoQ — high-performance brokerless Pub/Sub for streaming real-time data

nanoQ is a very minimalistic (opinionated/limited) Pub/Sub transport library.

Instant "fire and forget" publish with only best-effort delivery guarantee.

Do I need it?

For telecommunications, media, IoT, gaming, metrics, clicks, etc.: it's okay to loose data to get the most up to date messages.

Brokerless means no central broker server; publishers connect directly to subscribers.

No open Internet. nanoQ is for the private backend infrastructure networks only. There is no integrity / loop detections and others safety mechanisms.

Bandwidth vs latency is configurable, but ultimately nanoQ prefers bandwidth — it is designed to carry hundreds and thousands parallel streams through the backend infrastructure.

In a Nutshell

  • tiny: under 1K LOC
  • high-performance: no allocations on critical paths, granular locking, and other optimizations
  • low overhead simple protocol: 1-5 bytes of metadata (varint length prefix) per message, depending on the message size
  • battle-tested: running in production
  • data oblivious: JSON, protobuf, thrift, gzip, or XML - all goes
  • kubernetes and cloud-native: easy to integrate with existing load-balancers
  • scalable: no coordinator or a central server
  • simple URL address scheme: tcp4://localhost:1234 or unix:///var/run/sockety.sock
  • go native: context, modules and errors wrapping
  • transparent: with Prometheus metrics and pluggable logging

Quick start

Subscriber
import (
    "context"
    "log"
    "time"

    "github.com/aigent/nq"
)

func main() {
    opts := nq.SubOpts{
        KeepaliveTimeout: 5 * time.Second,
        Printf:           log.Printf,
    }
    sub := nq.NewSub("tcp4://:1234", opts, nq.NewDefaultMetrics())

    go func() {
        buf := make([]byte, 4096)
        for {
            if msg, stream, err := sub.Receive(context.TODO(), buf); err != nil {
                log.Println("Error while receiving:", err)
                continue
            } else {
                log.Printf("message from stream '%v' is: %s\n", stream, msg)
            }
        }
    }()
    if err := sub.Listen(context.TODO()); err != nil {
        log.Println("Listen error:", err)
    }
}
Publisher
import (
    "context"
    "log"
    "time"

    "github.com/aigent/nq"
)

func main() {
        opts := nq.PubOpts{
        KeepaliveTimeout: 5 * time.Second,
        ConnectTimeout:   3 * time.Second,
        WriteTimeout:     3 * time.Second,
        FlushFrequency:   100 * time.Millisecond,
        NoDelay:          true,
        Printf:           log.Printf,
    }
    pub := nq.NewPub("tcp4://localhost:1234", opts, nq.NewDefaultMetrics())
    for {
        // Publish the message using 100 connections
        for i := 1; i <= 100; i++ {
            if err := pub.Publish(context.TODO(), []byte("Hello nanoQ"), i); err != nil {
                log.Println("Error while publishing:", err)
            }
        }
    }
}

Documentation

Overview

Package nq (nanoQ) is a minimalistic brokerless Pub-Sub message queue for streaming. An application that wishes to receive messages should embed nq.Sub, and it will bind to a port and read network traffic. Another application can send messages to the first one using nq.Pub

nanoQ is designed to lose messages in a scenario when subscribers are inaccessible or not able to keep up with the workload. The aim is to transfer audio, video, or clicks, where it is important to get fresh data with minimal delay.

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	// MaxPayload prevents receiving side from DoS atack due to allocating too much RAM
	MaxPayload = 10 * 1024 * 1024 // 10Mb ought to be enough for anybody

	// ErrTooBig indicates that payload exceeds the limit
	ErrTooBig = errors.New("nanoq: payload exceeds the limit")
)
View Source
var MaxBuffer = MaxPayload * 10

MaxBuffer value limits internal buffers size to prevent memory blow up when there is no connection or network is too slow

Functions

func MustParseURL

func MustParseURL(s string) net.Addr

MustParseURL is a panic flavored ParseURL

func ParseURL

func ParseURL(s string) (net.Addr, error)

ParseURL parses net.Addr from net://addr URL (ex. "tcp4://localhost:8080" or "unix://:3999" )

Types

type Metrics

type Metrics struct {
	// contains filtered or unexported fields
}

Metrics provide insights to the current state of the app

func NewDefaultMetrics

func NewDefaultMetrics() *Metrics

NewDefaultMetrics constructs and registers Metrics with the prometheus.DefaultRegisterer

func NewMetrics

func NewMetrics(reg prometheus.Registerer) *Metrics

NewMetrics constructs and registers Metrics

type Pub

type Pub interface {
	Publish(ctx context.Context, payload []byte, streamKey interface{}) error
}

Pub is an asynchronous message publisher.

Publish stores the payload into memory buffer and returns immediately. There is only best effort delivery guarantee, nil error only means that message was enqueued for sending. The streamKey ties the payload with a certain stream (connection). Each time Publish receives new streamKey it spawns new connection. Messages published using the same streamKey will be transferred over the same stream in the same order. Note that streamKey exists only on the publisher's side. Subscriber provides a key to differentiate streams but it doesn't match with the one passed to the Pub.

Example
package main

import (
	"context"
	"log"
	"time"

	"github.com/aigent/nq"
)

func main() {
	opts := nq.PubOpts{
		KeepaliveTimeout: 5 * time.Second,
		ConnectTimeout:   3 * time.Second,
		WriteTimeout:     3 * time.Second,
		FlushFrequency:   100 * time.Millisecond,
		NoDelay:          true,
		Printf:           log.Printf,
	}
	pub := nq.NewPub("tcp4://localhost:1234", opts, nq.NewDefaultMetrics())
	for {
		// Publish the message using 100 connections
		for i := 1; i <= 100; i++ {
			_ = pub.Publish(context.TODO(), []byte("Hello nanoQ"), i)
		}
	}
}
Output:

func NewMultiPub

func NewMultiPub(urls []string, opts PubOpts, metr *Metrics) Pub

NewMultiPub constructs a Pub that publishes simultaneously to several destination Subs

func NewPub

func NewPub(url string, opts PubOpts, m *Metrics) Pub

NewPub constructs a single destination Pub

type PubOpts

type PubOpts struct {
	KeepaliveTimeout time.Duration // For how long to keep a stream alive if there are no new messages (default timeout is used on zero value)
	ConnectTimeout   time.Duration // Limit (re)-connects time (default timeout is used on zero value)
	WriteTimeout     time.Duration // Limit socket write operation time (zero means no deadline)
	FlushFrequency   time.Duration // Socket write frequency (default frequency is used on zero value)
	NoDelay          bool          // Disable or enable Nagle's algorithm (TCP only)

	// Printf optionally specifies a function used for logging
	Printf func(format string, v ...interface{})
}

PubOpts are publisher's options to tweak

type StreamDescriptor

type StreamDescriptor uint32

StreamDescriptor is used to separate messages from the different incoming streams

type Sub

type Sub interface {
	Listen(ctx context.Context) error
	Receive(ctx context.Context, p []byte) (payload []byte, stream StreamDescriptor, err error)
}

Sub is a subscriber.

Listen should be called for Sub to bind port and to start listen for messages. It will block until the ctx will signal Done, and will return ctx.Err().

Receive reads the next available message into the byte slice p. The payload returned by Receive alias the same memory region as p. If the buffer p is too short to hold the message Receive returns io.ErrShortBuffer and discards the message. Receive will block until either new message is available for readind or ctx signals Done. The stream descriptor returned by Receive is not the same as streamKey in Publish, it is an arbitrary key to distinguish messages sent through different streams (connections).

Example
opts := nq.SubOpts{
	KeepaliveTimeout: 5 * time.Second,
	Printf:           log.Printf,
}
sub := nq.NewSub("tcp4://:1234", opts, nq.NewDefaultMetrics())

go func() {
	buf := make([]byte, maxPayload)
	for {
		if msg, stream, err := sub.Receive(context.TODO(), buf); err != nil {
			log.Println("Error while receiving:", err)
			continue
		} else {
			log.Printf("message from stream '%v' is: %s\n", stream, msg)
		}
	}
}()
if err := sub.Listen(context.TODO()); err != nil {
	log.Println("Listen error:", err)
}
Output:

func NewSub

func NewSub(url string, opts SubOpts, metr *Metrics) Sub

NewSub constructs a Sub

type SubOpts

type SubOpts struct {
	KeepaliveTimeout time.Duration                         // how long to keep a stream alive if there are no new messages
	Printf           func(format string, v ...interface{}) // Printf optionally specifies a function used for logging
}

SubOpts to tweak

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL