Documentation ¶
Overview ¶
Package nq (nanoQ) is a minimalistic brokerless Pub-Sub message queue for streaming. An application that wishes to receive messages should embed nq.Sub, and it will bind to a port and read network traffic. Another application can send messages to the first one using nq.Pub
nanoQ is designed to lose messages in a scenario when subscribers are inaccessible or not able to keep up with the workload. The aim is to transfer audio, video, or clicks, where it is important to get fresh data with minimal delay.
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ( // MaxPayload prevents receiving side from DoS atack due to allocating too much RAM MaxPayload = 10 * 1024 * 1024 // 10Mb ought to be enough for anybody // ErrTooBig indicates that payload exceeds the limit ErrTooBig = errors.New("nanoq: payload exceeds the limit") )
var MaxBuffer = MaxPayload * 10
MaxBuffer value limits internal buffers size to prevent memory blow up when there is no connection or network is too slow
Functions ¶
Types ¶
type Metrics ¶
type Metrics struct {
// contains filtered or unexported fields
}
Metrics provide insights to the current state of the app
func NewDefaultMetrics ¶
func NewDefaultMetrics() *Metrics
NewDefaultMetrics constructs and registers Metrics with the prometheus.DefaultRegisterer
func NewMetrics ¶
func NewMetrics(reg prometheus.Registerer) *Metrics
NewMetrics constructs and registers Metrics
type Pub ¶
Pub is an asynchronous message publisher.
Publish stores the payload into memory buffer and returns immediately. There is only best effort delivery guarantee, nil error only means that message was enqueued for sending. The streamKey ties the payload with a certain stream (connection). Each time Publish receives new streamKey it spawns new connection. Messages published using the same streamKey will be transferred over the same stream in the same order. Note that streamKey exists only on the publisher's side. Subscriber provides a key to differentiate streams but it doesn't match with the one passed to the Pub.
Example ¶
package main import ( "context" "log" "time" "github.com/aigent/nq" ) func main() { opts := nq.PubOpts{ KeepaliveTimeout: 5 * time.Second, ConnectTimeout: 3 * time.Second, WriteTimeout: 3 * time.Second, FlushFrequency: 100 * time.Millisecond, NoDelay: true, Printf: log.Printf, } pub := nq.NewPub("tcp4://localhost:1234", opts, nq.NewDefaultMetrics()) for { // Publish the message using 100 connections for i := 1; i <= 100; i++ { _ = pub.Publish(context.TODO(), []byte("Hello nanoQ"), i) } } }
Output:
func NewMultiPub ¶
NewMultiPub constructs a Pub that publishes simultaneously to several destination Subs
type PubOpts ¶
type PubOpts struct { KeepaliveTimeout time.Duration // For how long to keep a stream alive if there are no new messages (default timeout is used on zero value) ConnectTimeout time.Duration // Limit (re)-connects time (default timeout is used on zero value) WriteTimeout time.Duration // Limit socket write operation time (zero means no deadline) FlushFrequency time.Duration // Socket write frequency (default frequency is used on zero value) NoDelay bool // Disable or enable Nagle's algorithm (TCP only) // Printf optionally specifies a function used for logging Printf func(format string, v ...interface{}) }
PubOpts are publisher's options to tweak
type StreamDescriptor ¶
type StreamDescriptor uint32
StreamDescriptor is used to separate messages from the different incoming streams
type Sub ¶
type Sub interface { Listen(ctx context.Context) error Receive(ctx context.Context, p []byte) (payload []byte, stream StreamDescriptor, err error) }
Sub is a subscriber.
Listen should be called for Sub to bind port and to start listen for messages. It will block until the ctx will signal Done, and will return ctx.Err().
Receive reads the next available message into the byte slice p. The payload returned by Receive alias the same memory region as p. If the buffer p is too short to hold the message Receive returns io.ErrShortBuffer and discards the message. Receive will block until either new message is available for readind or ctx signals Done. The stream descriptor returned by Receive is not the same as streamKey in Publish, it is an arbitrary key to distinguish messages sent through different streams (connections).
Example ¶
opts := nq.SubOpts{ KeepaliveTimeout: 5 * time.Second, Printf: log.Printf, } sub := nq.NewSub("tcp4://:1234", opts, nq.NewDefaultMetrics()) go func() { buf := make([]byte, maxPayload) for { if msg, stream, err := sub.Receive(context.TODO(), buf); err != nil { log.Println("Error while receiving:", err) continue } else { log.Printf("message from stream '%v' is: %s\n", stream, msg) } } }() if err := sub.Listen(context.TODO()); err != nil { log.Println("Listen error:", err) }
Output: