
package module
v1.3.0 Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: Aug 8, 2017 License: MIT Imports: 22 Imported by: 0


NATS - Go Client

A Go client for the NATS messaging system.

License MIT Go Report Card Build Status GoDoc Coverage Status


# Go client
go get

# Server
go get

Basic Usage

nc, _ := nats.Connect(nats.DefaultURL)

// Simple Publisher
nc.Publish("foo", []byte("Hello World"))

// Simple Async Subscriber
nc.Subscribe("foo", func(m *nats.Msg) {
    fmt.Printf("Received a message: %s\n", string(m.Data))

// Simple Sync Subscriber
sub, err := nc.SubscribeSync("foo")
m, err := sub.NextMsg(timeout)

// Channel Subscriber
ch := make(chan *nats.Msg, 64)
sub, err := nc.ChanSubscribe("foo", ch)
msg := <- ch

// Unsubscribe

// Requests
msg, err := nc.Request("help", []byte("help me"), 10*time.Millisecond)

// Replies
nc.Subscribe("help", func(m *Msg) {
    nc.Publish(m.Reply, []byte("I can help!"))

// Close connection
nc, _ := nats.Connect("nats://localhost:4222")

Encoded Connections

nc, _ := nats.Connect(nats.DefaultURL)
c, _ := nats.NewEncodedConn(nc, nats.JSON_ENCODER)
defer c.Close()

// Simple Publisher
c.Publish("foo", "Hello World")

// Simple Async Subscriber
c.Subscribe("foo", func(s string) {
    fmt.Printf("Received a message: %s\n", s)

// EncodedConn can Publish any raw Go type using the registered Encoder
type person struct {
     Name     string
     Address  string
     Age      int

// Go type Subscriber
c.Subscribe("hello", func(p *person) {
    fmt.Printf("Received a person: %+v\n", p)

me := &person{Name: "derek", Age: 22, Address: "140 New Montgomery Street, San Francisco, CA"}

// Go type Publisher
c.Publish("hello", me)

// Unsubscribe
sub, err := c.Subscribe("foo", nil)

// Requests
var response string
err := c.Request("help", "help me", &response, 10*time.Millisecond)
if err != nil {
    fmt.Printf("Request failed: %v\n", err)

// Replying
c.Subscribe("help", func(subj, reply string, msg string) {
    c.Publish(reply, "I can help!")

// Close connection


// tls as a scheme will enable secure connections by default. This will also verify the server name.
nc, err := nats.Connect("tls://")

// If you are using a self-signed certificate, you need to have a tls.Config with RootCAs setup.
// We provide a helper method to make this case easier.
nc, err = nats.Connect("tls://localhost:4443", nats.RootCAs("./configs/certs/ca.pem"))

// If the server requires client certificate, there is an helper function for that too:
cert := nats.ClientCert("./configs/certs/client-cert.pem", "./configs/certs/client-key.pem")
nc, err = nats.Connect("tls://localhost:4443", cert)

// You can also supply a complete tls.Config

certFile := "./configs/certs/client-cert.pem"
keyFile := "./configs/certs/client-key.pem"
cert, err := tls.LoadX509KeyPair(certFile, keyFile)
if err != nil {
    t.Fatalf("error parsing X509 certificate/key pair: %v", err)

config := &tls.Config{
    ServerName: 	opts.Host,
    Certificates: 	[]tls.Certificate{cert},
    RootCAs:    	pool,
    MinVersion: 	tls.VersionTLS12,

nc, err = nats.Connect("nats://localhost:4443", nats.Secure(config))
if err != nil {
	t.Fatalf("Got an error on Connect with Secure Options: %+v\n", err)

Using Go Channels (netchan)

nc, _ := nats.Connect(nats.DefaultURL)
ec, _ := nats.NewEncodedConn(nc, nats.JSON_ENCODER)
defer ec.Close()

type person struct {
     Name     string
     Address  string
     Age      int

recvCh := make(chan *person)
ec.BindRecvChan("hello", recvCh)

sendCh := make(chan *person)
ec.BindSendChan("hello", sendCh)

me := &person{Name: "derek", Age: 22, Address: "140 New Montgomery Street"}

// Send via Go channels
sendCh <- me

// Receive via Go channels
who := <- recvCh

Wildcard Subscriptions

// "*" matches any token, at any level of the subject.
nc.Subscribe("foo.*.baz", func(m *Msg) {
    fmt.Printf("Msg received on [%s] : %s\n", m.Subject, string(m.Data));

nc.Subscribe("*", func(m *Msg) {
    fmt.Printf("Msg received on [%s] : %s\n", m.Subject, string(m.Data));

// ">" matches any length of the tail of a subject, and can only be the last token
// E.g. 'foo.>' will match '', '', ''
nc.Subscribe("foo.>", func(m *Msg) {
    fmt.Printf("Msg received on [%s] : %s\n", m.Subject, string(m.Data));

// Matches all of the above
nc.Publish("", []byte("Hello World"))

Queue Groups

// All subscriptions with the same queue name will form a queue group.
// Each message will be delivered to only one subscriber per queue group,
// using queuing semantics. You can have as many queue groups as you wish.
// Normal subscribers will continue to work as expected.

nc.QueueSubscribe("foo", "job_workers", func(_ *Msg) {
  received += 1;

Advanced Usage

// Flush connection to server, returns when all messages have been processed.
fmt.Println("All clear!")

// FlushTimeout specifies a timeout value as well.
err := nc.FlushTimeout(1*time.Second)
if err != nil {
    fmt.Println("All clear!")
} else {
    fmt.Println("Flushed timed out!")

// Auto-unsubscribe after MAX_WANTED messages received
const MAX_WANTED = 10
sub, err := nc.Subscribe("foo")

// Multiple connections
nc1 := nats.Connect("nats://host1:4222")
nc2 := nats.Connect("nats://host2:4222")

nc1.Subscribe("foo", func(m *Msg) {
    fmt.Printf("Received a message: %s\n", string(m.Data))

nc2.Publish("foo", []byte("Hello World!"));

Clustered Usage

var servers = "nats://localhost:1222, nats://localhost:1223, nats://localhost:1224"

nc, err := nats.Connect(servers)

// Optionally set ReconnectWait and MaxReconnect attempts.
// This example means 10 seconds total per backend.
nc, err = nats.Connect(servers, nats.MaxReconnects(5), nats.ReconnectWait(2 * time.Second))

// Optionally disable randomization of the server pool
nc, err = nats.Connect(servers, nats.DontRandomize())

// Setup callbacks to be notified on disconnects, reconnects and connection closed.
nc, err = nats.Connect(servers,
	nats.DisconnectHandler(func(nc *nats.Conn) {
		fmt.Printf("Got disconnected!\n")
	nats.ReconnectHandler(func(_ *nats.Conn) {
		fmt.Printf("Got reconnected to %v!\n", nc.ConnectedUrl())
	nats.ClosedHandler(func(nc *nats.Conn) {
		fmt.Printf("Connection closed. Reason: %q\n", nc.LastError())

// When connecting to a mesh of servers with auto-discovery capabilities,
// you may need to provide a username/password or token in order to connect
// to any server in that mesh when authentication is required.
// Instead of providing the credentials in the initial URL, you will use
// new option setters:
nc, err = nats.Connect("nats://localhost:4222", nats.UserInfo("foo", "bar"))

// For token based authentication:
nc, err = nats.Connect("nats://localhost:4222", nats.Token("S3cretT0ken"))

// You can even pass the two at the same time in case one of the server
// in the mesh requires token instead of user name and password.
nc, err = nats.Connect("nats://localhost:4222",
    nats.UserInfo("foo", "bar"),

// Note that if credentials are specified in the initial URLs, they take
// precedence on the credentials specfied through the options.
// For instance, in the connect call below, the client library will use
// the user "my" and password "pwd" to connect to locahost:4222, however,
// it will use username "foo" and password "bar" when (re)connecting to
// a different server URL that it got as part of the auto-discovery.
nc, err = nats.Connect("nats://my:pwd@localhost:4222", nats.UserInfo("foo", "bar"))

Context support (+Go 1.7)

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

nc, err := nats.Connect(nats.DefaultURL)

// Request with context
msg, err := nc.RequestWithContext(ctx, "foo", []byte("bar"))

// Synchronous subscriber with context
sub, err := nc.SubscribeSync("foo")
msg, err := sub.NextMsgWithContext(ctx)

// Encoded Request with context
c, err := nats.NewEncodedConn(nc, nats.JSON_ENCODER)
type request struct {
	Message string `json:"message"`
type response struct {
	Code int `json:"code"`
req := &request{Message: "Hello"}
resp := &response{}
err := c.RequestWithContext(ctx, "foo", req, resp)


(The MIT License)

Copyright (c) 2012-2017 Apcera Inc.

Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.




A Go client for the NATS messaging system (

A Go client for the NATS messaging system (




View Source
const (
	JSON_ENCODER    = "json"
	GOB_ENCODER     = "gob"
	DEFAULT_ENCODER = "default"

Indexe names into the Registered Encoders.

View Source
const (
	Version                 = "1.3.0"
	DefaultURL              = "nats://localhost:4222"
	DefaultPort             = 4222
	DefaultMaxReconnect     = 60
	DefaultReconnectWait    = 2 * time.Second
	DefaultTimeout          = 2 * time.Second
	DefaultPingInterval     = 2 * time.Minute
	DefaultMaxPingOut       = 2
	DefaultMaxChanLen       = 8192            // 8k
	DefaultReconnectBufSize = 8 * 1024 * 1024 // 8MB
	RequestChanLen          = 8
	LangString              = "go"

Default Constants

View Source
const (
	DISCONNECTED = Status(iota)
View Source
const (
	AsyncSubscription = SubscriptionType(iota)

The different types of subscription types.

View Source
const (
	DefaultSubPendingMsgsLimit  = 65536
	DefaultSubPendingBytesLimit = 65536 * 1024

Pending Limits

View Source
const (
	OP_START = iota
View Source
const AUTHORIZATION_ERR = "authorization violation"

AUTHORIZATION_ERR is for when nats server user authorization has failed.

View Source
const InboxPrefix = "_INBOX."

InboxPrefix is the prefix for all inbox subjects.

View Source
View Source
const PERMISSIONS_ERR = "permissions violation"

PERMISSIONS_ERR is for when nats server subject authorization has failed.

View Source
const STALE_CONNECTION = "stale connection"

STALE_CONNECTION is for detection and proper handling of stale connections.


View Source
var (
	ErrConnectionClosed     = errors.New("nats: connection closed")
	ErrSecureConnRequired   = errors.New("nats: secure connection required")
	ErrSecureConnWanted     = errors.New("nats: secure connection not available")
	ErrBadSubscription      = errors.New("nats: invalid subscription")
	ErrTypeSubscription     = errors.New("nats: invalid subscription type")
	ErrBadSubject           = errors.New("nats: invalid subject")
	ErrSlowConsumer         = errors.New("nats: slow consumer, messages dropped")
	ErrTimeout              = errors.New("nats: timeout")
	ErrBadTimeout           = errors.New("nats: timeout invalid")
	ErrAuthorization        = errors.New("nats: authorization violation")
	ErrNoServers            = errors.New("nats: no servers available for connection")
	ErrJsonParse            = errors.New("nats: connect message, json parse error")
	ErrChanArg              = errors.New("nats: argument needs to be a channel type")
	ErrMaxPayload           = errors.New("nats: maximum payload exceeded")
	ErrMaxMessages          = errors.New("nats: maximum messages delivered")
	ErrSyncSubRequired      = errors.New("nats: illegal call on an async subscription")
	ErrMultipleTLSConfigs   = errors.New("nats: multiple tls.Configs not allowed")
	ErrNoInfoReceived       = errors.New("nats: protocol exception, INFO not received")
	ErrReconnectBufExceeded = errors.New("nats: outbound buffer limit exceeded")
	ErrInvalidConnection    = errors.New("nats: invalid connection")
	ErrInvalidMsg           = errors.New("nats: invalid message or message nil")
	ErrInvalidArg           = errors.New("nats: invalid argument")
	ErrInvalidContext       = errors.New("nats: invalid context")
	ErrStaleConnection      = errors.New("nats: " + STALE_CONNECTION)


View Source
var DefaultOptions = GetDefaultOptions()

DEPRECATED: Use GetDefaultOptions() instead. DefaultOptions is not safe for use by multiple clients. For details see #308.


func NewInbox

func NewInbox() string

NewInbox will return an inbox string which can be used for directed replies from subscribers. These are guaranteed to be unique, but can be shared and subscribed to by others.

func RegisterEncoder

func RegisterEncoder(encType string, enc Encoder)

RegisterEncoder will register the encType with the given Encoder. Useful for customization.


type Conn

type Conn struct {
	// Keep all members for which we use atomic at the beginning of the
	// struct and make sure they are all 64bits (or use padding if necessary).
	// atomic.* functions crash on 32bit machines if operand is not aligned
	// at 64bit. See

	Opts Options
	// contains filtered or unexported fields

A Conn represents a bare connection to a nats-server. It can send and receive []byte payloads.

func Connect

func Connect(url string, options ...Option) (*Conn, error)

Connect will attempt to connect to the NATS system. The url can contain username/password semantics. e.g. nats://derek:pass@localhost:4222 Comma separated arrays are also supported, e.g. urlA, urlB. Options start with the defaults but can be overridden.


Shows different ways to create a Conn

nc, _ := nats.Connect(nats.DefaultURL)

nc, _ = nats.Connect("nats://")

nc, _ = nats.Connect("tls://")

opts := nats.Options{
	AllowReconnect: true,
	MaxReconnect:   10,
	ReconnectWait:  5 * time.Second,
	Timeout:        1 * time.Second,

nc, _ = opts.Connect()

func (*Conn) AuthRequired added in v1.2.0

func (nc *Conn) AuthRequired() bool

AuthRequired will return if the connected server requires authorization.

func (*Conn) Buffered added in v1.1.2

func (nc *Conn) Buffered() (int, error)

Buffered will return the number of bytes buffered to be sent to the server. FIXME(dlc) take into account disconnected state.

func (*Conn) ChanQueueSubscribe added in v1.2.0

func (nc *Conn) ChanQueueSubscribe(subj, group string, ch chan *Msg) (*Subscription, error)

ChanQueueSubscribe will place all messages received on the channel. You should not close the channel until sub.Unsubscribe() has been called.

func (*Conn) ChanSubscribe added in v1.2.0

func (nc *Conn) ChanSubscribe(subj string, ch chan *Msg) (*Subscription, error)

ChanSubscribe will place all messages received on the channel. You should not close the channel until sub.Unsubscribe() has been called.

func (*Conn) Close

func (nc *Conn) Close()

Close will close the connection to the server. This call will release all blocking calls, such as Flush() and NextMsg()

nc, _ := nats.Connect(nats.DefaultURL)

func (*Conn) ConnectedServerId added in v1.0.5

func (nc *Conn) ConnectedServerId() string

Report the connected server's Id

func (*Conn) ConnectedUrl

func (nc *Conn) ConnectedUrl() string

Report the connected server's Url

func (*Conn) DiscoveredServers added in v1.2.2

func (nc *Conn) DiscoveredServers() []string

DiscoveredServers returns only the server urls that have been discovered after a connection has been established. If authentication is enabled, use UserInfo or Token when connecting with these urls.

func (*Conn) Flush

func (nc *Conn) Flush() error

Flush will perform a round trip to the server and return when it receives the internal reply.

nc, _ := nats.Connect(nats.DefaultURL)
defer nc.Close()

msg := &nats.Msg{Subject: "foo", Reply: "bar", Data: []byte("Hello World!")}
for i := 0; i < 1000; i++ {
err := nc.Flush()
if err == nil {
	// Everything has been processed by the server for nc *Conn.

func (*Conn) FlushTimeout

func (nc *Conn) FlushTimeout(timeout time.Duration) (err error)

FlushTimeout allows a Flush operation to have an associated timeout.

nc, _ := nats.Connect(nats.DefaultURL)
defer nc.Close()

msg := &nats.Msg{Subject: "foo", Reply: "bar", Data: []byte("Hello World!")}
for i := 0; i < 1000; i++ {
// Only wait for up to 1 second for Flush
err := nc.FlushTimeout(1 * time.Second)
if err == nil {
	// Everything has been processed by the server for nc *Conn.

func (*Conn) IsClosed

func (nc *Conn) IsClosed() bool

IsClosed tests if a Conn has been closed.

func (*Conn) IsConnected added in v1.2.2

func (nc *Conn) IsConnected() bool

IsConnected tests if a Conn is connected.

func (*Conn) IsReconnecting

func (nc *Conn) IsReconnecting() bool

IsReconnecting tests if a Conn is reconnecting.

func (*Conn) LastError

func (nc *Conn) LastError() error

LastError reports the last error encountered via the connection. It can be used reliably within ClosedCB in order to find out reason why connection was closed for example.

func (*Conn) MaxPayload added in v1.1.2

func (nc *Conn) MaxPayload() int64

MaxPayload returns the size limit that a message payload can have. This is set by the server configuration and delivered to the client upon connect.

func (*Conn) Publish

func (nc *Conn) Publish(subj string, data []byte) error

Publish publishes the data argument to the given subject. The data argument is left untouched and needs to be correctly interpreted on the receiver.

nc, _ := nats.Connect(nats.DefaultURL)
defer nc.Close()

nc.Publish("foo", []byte("Hello World!"))

func (*Conn) PublishMsg

func (nc *Conn) PublishMsg(m *Msg) error

PublishMsg publishes the Msg structure, which includes the Subject, an optional Reply and an optional Data field.

nc, _ := nats.Connect(nats.DefaultURL)
defer nc.Close()

msg := &nats.Msg{Subject: "foo", Reply: "bar", Data: []byte("Hello World!")}

func (*Conn) PublishRequest

func (nc *Conn) PublishRequest(subj, reply string, data []byte) error

PublishRequest will perform a Publish() excpecting a response on the reply subject. Use Request() for automatically waiting for a response inline.

func (*Conn) QueueSubscribe

func (nc *Conn) QueueSubscribe(subj, queue string, cb MsgHandler) (*Subscription, error)

QueueSubscribe creates an asynchronous queue subscriber on the given subject. All subscribers with the same queue name will form the queue group and only one member of the group will be selected to receive any given message asynchronously.

nc, _ := nats.Connect(nats.DefaultURL)
defer nc.Close()

received := 0

nc.QueueSubscribe("foo", "worker_group", func(_ *nats.Msg) {

func (*Conn) QueueSubscribeSync

func (nc *Conn) QueueSubscribeSync(subj, queue string) (*Subscription, error)

QueueSubscribeSync creates a synchronous queue subscriber on the given subject. All subscribers with the same queue name will form the queue group and only one member of the group will be selected to receive any given message synchronously.

func (*Conn) QueueSubscribeSyncWithChan added in v1.2.0

func (nc *Conn) QueueSubscribeSyncWithChan(subj, queue string, ch chan *Msg) (*Subscription, error)

QueueSubscribeSyncWithChan is syntactic sugar for ChanQueueSubscribe(subject, group, ch).

func (*Conn) Request

func (nc *Conn) Request(subj string, data []byte, timeout time.Duration) (*Msg, error)

Request will send a request payload and deliver the response message, or an error, including a timeout if no message was received properly.

nc, _ := nats.Connect(nats.DefaultURL)
defer nc.Close()

nc.Subscribe("foo", func(m *nats.Msg) {
	nc.Publish(m.Reply, []byte("I will help you"))
nc.Request("foo", []byte("help"), 50*time.Millisecond)

func (*Conn) RequestWithContext added in v1.3.0

func (nc *Conn) RequestWithContext(ctx context.Context, subj string, data []byte) (*Msg, error)

RequestWithContext takes a context, a subject and payload in bytes and request expecting a single response.

func (*Conn) Servers added in v1.2.2

func (nc *Conn) Servers() []string

Servers returns the list of known server urls, including additional servers discovered after a connection has been established. If authentication is enabled, use UserInfo or Token when connecting with these urls.

func (*Conn) SetClosedHandler added in v1.2.0

func (nc *Conn) SetClosedHandler(cb ConnHandler)

SetClosedHandler will set the reconnect event handler.

func (*Conn) SetDisconnectHandler added in v1.2.0

func (nc *Conn) SetDisconnectHandler(dcb ConnHandler)

SetDisconnectHandler will set the disconnect event handler.

func (*Conn) SetDiscoveredServersHandler added in v1.3.0

func (nc *Conn) SetDiscoveredServersHandler(dscb ConnHandler)

SetDiscoveredServersHandler will set the discovered servers handler.

func (*Conn) SetErrorHandler added in v1.2.0

func (nc *Conn) SetErrorHandler(cb ErrHandler)

SetErrHandler will set the async error handler.

func (*Conn) SetReconnectHandler added in v1.2.0

func (nc *Conn) SetReconnectHandler(rcb ConnHandler)

SetReconnectHandler will set the reconnect event handler.

func (*Conn) Stats

func (nc *Conn) Stats() Statistics

Stats will return a race safe copy of the Statistics section for the connection.

func (*Conn) Status

func (nc *Conn) Status() Status

Status returns the current state of the connection.

func (*Conn) Subscribe

func (nc *Conn) Subscribe(subj string, cb MsgHandler) (*Subscription, error)

Subscribe will express interest in the given subject. The subject can have wildcards (partial:*, full:>). Messages will be delivered to the associated MsgHandler. If no MsgHandler is given, the subscription is a synchronous subscription and can be polled via Subscription.NextMsg().


This Example shows an asynchronous subscriber.

nc, _ := nats.Connect(nats.DefaultURL)
defer nc.Close()

nc.Subscribe("foo", func(m *nats.Msg) {
	fmt.Printf("Received a message: %s\n", string(m.Data))

func (*Conn) SubscribeSync

func (nc *Conn) SubscribeSync(subj string) (*Subscription, error)

SubscribeSync is syntactic sugar for Subscribe(subject, nil).


This Example shows a synchronous subscriber.

nc, _ := nats.Connect(nats.DefaultURL)
defer nc.Close()

sub, _ := nc.SubscribeSync("foo")
m, err := sub.NextMsg(1 * time.Second)
if err == nil {
	fmt.Printf("Received a message: %s\n", string(m.Data))
} else {
	fmt.Println("NextMsg timed out.")

func (*Conn) TLSRequired added in v1.2.0

func (nc *Conn) TLSRequired() bool

TLSRequired will return if the connected server requires TLS connections.

type ConnHandler

type ConnHandler func(*Conn)

ConnHandler is used for asynchronous events such as disconnected and closed connections.

type EncodedConn

type EncodedConn struct {
	Conn *Conn
	Enc  Encoder

EncodedConn are the preferred way to interface with NATS. They wrap a bare connection to a nats server and have an extendable encoder system that will encode and decode messages from raw Go types.

func NewEncodedConn

func NewEncodedConn(c *Conn, encType string) (*EncodedConn, error)

NewEncodedConn will wrap an existing Connection and utilize the appropriate registered encoder.


Shows how to wrap a Conn into an EncodedConn

nc, _ := nats.Connect(nats.DefaultURL)
c, _ := nats.NewEncodedConn(nc, "json")

func (*EncodedConn) BindRecvChan

func (c *EncodedConn) BindRecvChan(subject string, channel interface{}) (*Subscription, error)

BindRecvChan binds a channel for receive operations from NATS.


BindRecvChan() allows binding of a Go channel to a nats subject for subscribe operations. The Encoder attached to the EncodedConn will be used for un-marshaling.

nc, _ := nats.Connect(nats.DefaultURL)
c, _ := nats.NewEncodedConn(nc, "json")
defer c.Close()

type person struct {
	Name    string
	Address string
	Age     int

ch := make(chan *person)
c.BindRecvChan("hello", ch)

me := &person{Name: "derek", Age: 22, Address: "85 Second St"}
c.Publish("hello", me)

// Receive the publish directly on a channel
who := <-ch

fmt.Printf("%v says hello!\n", who)

func (*EncodedConn) BindRecvQueueChan

func (c *EncodedConn) BindRecvQueueChan(subject, queue string, channel interface{}) (*Subscription, error)

BindRecvQueueChan binds a channel for queue-based receive operations from NATS.

func (*EncodedConn) BindSendChan

func (c *EncodedConn) BindSendChan(subject string, channel interface{}) error

BindSendChan binds a channel for send operations to NATS.


BindSendChan() allows binding of a Go channel to a nats subject for publish operations. The Encoder attached to the EncodedConn will be used for marshaling.

nc, _ := nats.Connect(nats.DefaultURL)
c, _ := nats.NewEncodedConn(nc, "json")
defer c.Close()

type person struct {
	Name    string
	Address string
	Age     int

ch := make(chan *person)
c.BindSendChan("hello", ch)

me := &person{Name: "derek", Age: 22, Address: "85 Second St"}
ch <- me

func (*EncodedConn) Close

func (c *EncodedConn) Close()

Close will close the connection to the server. This call will release all blocking calls, such as Flush(), etc.

func (*EncodedConn) Flush

func (c *EncodedConn) Flush() error

Flush will perform a round trip to the server and return when it receives the internal reply.

func (*EncodedConn) FlushTimeout

func (c *EncodedConn) FlushTimeout(timeout time.Duration) (err error)

FlushTimeout allows a Flush operation to have an associated timeout.

func (*EncodedConn) LastError

func (c *EncodedConn) LastError() error

LastError reports the last error encountered via the Connection.

func (*EncodedConn) Publish

func (c *EncodedConn) Publish(subject string, v interface{}) error

Publish publishes the data argument to the given subject. The data argument will be encoded using the associated encoder.


EncodedConn can publish virtually anything just by passing it in. The encoder will be used to properly encode the raw Go type

nc, _ := nats.Connect(nats.DefaultURL)
c, _ := nats.NewEncodedConn(nc, "json")
defer c.Close()

type person struct {
	Name    string
	Address string
	Age     int

me := &person{Name: "derek", Age: 22, Address: "85 Second St"}
c.Publish("hello", me)

func (*EncodedConn) PublishRequest

func (c *EncodedConn) PublishRequest(subject, reply string, v interface{}) error

PublishRequest will perform a Publish() expecting a response on the reply subject. Use Request() for automatically waiting for a response inline.

func (*EncodedConn) QueueSubscribe

func (c *EncodedConn) QueueSubscribe(subject, queue string, cb Handler) (*Subscription, error)

QueueSubscribe will create a queue subscription on the given subject and process incoming messages using the specified Handler. The Handler should be a func that matches a signature from the description of Handler from above.

func (*EncodedConn) Request

func (c *EncodedConn) Request(subject string, v interface{}, vPtr interface{}, timeout time.Duration) error

Request will create an Inbox and perform a Request() call with the Inbox reply for the data v. A response will be decoded into the vPtrResponse.

func (*EncodedConn) RequestWithContext added in v1.3.0

func (c *EncodedConn) RequestWithContext(ctx context.Context, subject string, v interface{}, vPtr interface{}) error

RequestWithContext will create an Inbox and perform a Request using the provided cancellation context with the Inbox reply for the data v. A response will be decoded into the vPtrResponse.

func (*EncodedConn) Subscribe

func (c *EncodedConn) Subscribe(subject string, cb Handler) (*Subscription, error)

Subscribe will create a subscription on the given subject and process incoming messages using the specified Handler. The Handler should be a func that matches a signature from the description of Handler from above.


EncodedConn's subscribers will automatically decode the wire data into the requested Go type using the Decode() method of the registered Encoder. The callback signature can also vary to include additional data, such as subject and reply subjects.

nc, _ := nats.Connect(nats.DefaultURL)
c, _ := nats.NewEncodedConn(nc, "json")
defer c.Close()

type person struct {
	Name    string
	Address string
	Age     int

c.Subscribe("hello", func(p *person) {
	fmt.Printf("Received a person! %+v\n", p)

c.Subscribe("hello", func(subj, reply string, p *person) {
	fmt.Printf("Received a person on subject %s! %+v\n", subj, p)

me := &person{Name: "derek", Age: 22, Address: "85 Second St"}
c.Publish("hello", me)

type Encoder

type Encoder interface {
	Encode(subject string, v interface{}) ([]byte, error)
	Decode(subject string, data []byte, vPtr interface{}) error

Encoder interface is for all register encoders

func EncoderForType

func EncoderForType(encType string) Encoder

EncoderForType will return the registered Encoder for the encType.

type ErrHandler

type ErrHandler func(*Conn, *Subscription, error)

ErrHandler is used to process asynchronous errors encountered while processing inbound messages.

type Handler

type Handler interface{}

Handler is a specific callback used for Subscribe. It is generalized to an interface{}, but we will discover its format and arguments at runtime and perform the correct callback, including de-marshaling JSON strings back into the appropriate struct based on the signature of the Handler.

Handlers are expected to have one of four signatures.

type person struct {
	Name string `json:"name,omitempty"`
	Age  uint   `json:"age,omitempty"`

handler := func(m *Msg)
handler := func(p *person)
handler := func(subject string, o *obj)
handler := func(subject, reply string, o *obj)

These forms allow a callback to request a raw Msg ptr, where the processing of the message from the wire is untouched. Process a JSON representation and demarshal it into the given struct, e.g. person. There are also variants where the callback wants either the subject, or the subject and the reply subject.

type Msg

type Msg struct {
	Subject string
	Reply   string
	Data    []byte
	Sub     *Subscription
	// contains filtered or unexported fields

Msg is a structure used by Subscribers and PublishMsg().

type MsgHandler

type MsgHandler func(msg *Msg)

MsgHandler is a callback function that processes messages delivered to asynchronous subscribers.

type Option added in v1.2.0

type Option func(*Options) error

Option is a function on the options for a connection.

func ClientCert added in v1.2.0

func ClientCert(certFile, keyFile string) Option

ClientCert is a helper option to provide the client certificate from a file. If Secure is not already set this will set it as well

func ClosedHandler added in v1.2.0

func ClosedHandler(cb ConnHandler) Option

ClosedHandler is an Option to set the closed handler.

func Dialer added in v1.2.2

func Dialer(dialer *net.Dialer) Option

Dialer is an Option to set the dialer which will be used when attempting to establish a connection.

func DisconnectHandler added in v1.2.0

func DisconnectHandler(cb ConnHandler) Option

DisconnectHandler is an Option to set the disconnected handler.

func DiscoveredServersHandler added in v1.3.0

func DiscoveredServersHandler(cb ConnHandler) Option

DiscoveredServersHandler is an Option to set the new servers handler.

func DontRandomize added in v1.2.0

func DontRandomize() Option

DontRandomize is an Option to turn off randomizing the server pool.

func ErrorHandler added in v1.2.0

func ErrorHandler(cb ErrHandler) Option

ErrHandler is an Option to set the async error handler.

func MaxReconnects added in v1.2.0

func MaxReconnects(max int) Option

MaxReconnects is an Option to set the maximum number of reconnect attempts.

func Name added in v1.2.0

func Name(name string) Option

Name is an Option to set the client name.

func NoReconnect added in v1.2.0

func NoReconnect() Option

NoReconnect is an Option to turn off reconnect behavior.

func ReconnectHandler added in v1.2.0

func ReconnectHandler(cb ConnHandler) Option

ReconnectHandler is an Option to set the reconnected handler.

func ReconnectWait added in v1.2.0

func ReconnectWait(t time.Duration) Option

ReconnectWait is an Option to set the wait time between reconnect attempts.

func RootCAs added in v1.2.0

func RootCAs(file ...string) Option

RootCAs is a helper option to provide the RootCAs pool from a list of filenames. If Secure is not already set this will set it as well.

func Secure added in v1.2.0

func Secure(tls ...*tls.Config) Option

Secure is an Option to enable TLS secure connections that skip server verification by default. Pass a TLS Configuration for proper TLS.

func Timeout added in v1.2.0

func Timeout(t time.Duration) Option

Timeout is an Option to set the timeout for Dial on a connection.

func Token added in v1.2.2

func Token(token string) Option

Token is an Option to set the token to use when not included directly in the URLs.

func UseOldRequestStyle added in v1.3.0

func UseOldRequestStyle() Option

UseOldRequestyStyle is an Option to force usage of the old Request style.

func UserInfo added in v1.2.2

func UserInfo(user, password string) Option

UserInfo is an Option to set the username and password to use when not included directly in the URLs.

type Options

type Options struct {

	// Url represents a single NATS server url to which the client
	// will be connecting. If the Servers option is also set, it
	// then becomes the first server in the Servers array.
	Url string

	// Servers is a configured set of servers which this client
	// will use when attempting to connect.
	Servers []string

	// NoRandomize configures whether we will randomize the
	// server pool.
	NoRandomize bool

	// Name is an optional name label which will be sent to the server
	// on CONNECT to identify the client.
	Name string

	// Verbose signals the server to send an OK ack for commands
	// successfully processed by the server.
	Verbose bool

	// Pedantic signals the server whether it should be doing further
	// validation of subjects.
	Pedantic bool

	// Secure enables TLS secure connections that skip server
	// verification by default. NOT RECOMMENDED.
	Secure bool

	// TLSConfig is a custom TLS configuration to use for secure
	// transports.
	TLSConfig *tls.Config

	// AllowReconnect enables reconnection logic to be used when we
	// encounter a disconnect from the current server.
	AllowReconnect bool

	// MaxReconnect sets the number of reconnect attempts that will be
	// tried before giving up. If negative, then it will never give up
	// trying to reconnect.
	MaxReconnect int

	// ReconnectWait sets the time to backoff after attempting a reconnect
	// to a server that we were already connected to previously.
	ReconnectWait time.Duration

	// Timeout sets the timeout for a Dial operation on a connection.
	Timeout time.Duration

	// FlusherTimeout is the maximum time to wait for the flusher loop
	// to be able to finish writing to the underlying connection.
	FlusherTimeout time.Duration

	// PingInterval is the period at which the client will be sending ping
	// commands to the server, disabled if 0 or negative.
	PingInterval time.Duration

	// MaxPingsOut is the maximum number of pending ping commands that can
	// be awaiting a response before raising an ErrStaleConnection error.
	MaxPingsOut int

	// ClosedCB sets the closed handler that is called when a client will
	// no longer be connected.
	ClosedCB ConnHandler

	// DisconnectedCB sets the disconnected handler that is called
	// whenever the connection is disconnected.
	DisconnectedCB ConnHandler

	// ReconnectedCB sets the reconnected handler called whenever
	// the connection is successfully reconnected.
	ReconnectedCB ConnHandler

	// DiscoveredServersCB sets the callback that is invoked whenever a new
	// server has joined the cluster.
	DiscoveredServersCB ConnHandler

	// AsyncErrorCB sets the async error handler (e.g. slow consumer errors)
	AsyncErrorCB ErrHandler

	// ReconnectBufSize is the size of the backing bufio during reconnect.
	// Once this has been exhausted publish operations will return an error.
	ReconnectBufSize int

	// SubChanLen is the size of the buffered channel used between the socket
	// Go routine and the message delivery for SyncSubscriptions.
	// NOTE: This does not affect AsyncSubscriptions which are
	// dictated by PendingLimits()
	SubChanLen int

	// User sets the username to be used when connecting to the server.
	User string

	// Password sets the password to be used when connecting to a server.
	Password string

	// Token sets the token to be used when connecting to a server.
	Token string

	// Dialer allows a custom Dialer when forming connections.
	Dialer *net.Dialer

	// UseOldRequestStyle forces the old method of Requests that utilize
	// a new Inbox and a new Subscription for each request.
	UseOldRequestStyle bool

Options can be used to create a customized connection.

func GetDefaultOptions added in v1.3.0

func GetDefaultOptions() Options

GetDefaultOptions returns default configuration options for the client.

func (Options) Connect

func (o Options) Connect() (*Conn, error)

Connect will attempt to connect to a NATS server with multiple options.

type Statistics

type Statistics struct {
	InMsgs     uint64
	OutMsgs    uint64
	InBytes    uint64
	OutBytes   uint64
	Reconnects uint64

Tracks various stats received and sent on this connection, including counts for messages and bytes.

type Status

type Status int

Status represents the state of the connection.

type Subscription

type Subscription struct {

	// Subject that represents this subscription. This can be different
	// than the received subject inside a Msg if this is a wildcard.
	Subject string

	// Optional queue group name. If present, all subscriptions with the
	// same name will form a distributed queue, and each message will
	// only be processed by one member of the group.
	Queue string
	// contains filtered or unexported fields

A Subscription represents interest in a given subject.

func (*Subscription) AutoUnsubscribe

func (s *Subscription) AutoUnsubscribe(max int) error

AutoUnsubscribe will issue an automatic Unsubscribe that is processed by the server when max messages have been received. This can be useful when sending a request to an unknown number of subscribers. Request() uses this functionality.

nc, _ := nats.Connect(nats.DefaultURL)
defer nc.Close()

received, wanted, total := 0, 10, 100

sub, _ := nc.Subscribe("foo", func(_ *nats.Msg) {

for i := 0; i < total; i++ {
	nc.Publish("foo", []byte("Hello"))

fmt.Printf("Received = %d", received)

func (*Subscription) ClearMaxPending added in v1.2.0

func (s *Subscription) ClearMaxPending() error

ClearMaxPending resets the maximums seen so far.

func (*Subscription) Delivered added in v1.2.0

func (s *Subscription) Delivered() (int64, error)

Delivered returns the number of delivered messages for this subscription.

func (*Subscription) Dropped added in v1.2.0

func (s *Subscription) Dropped() (int, error)

Dropped returns the number of known dropped messages for this subscription. This will correspond to messages dropped by violations of PendingLimits. If the server declares the connection a SlowConsumer, this number may not be valid.

func (*Subscription) IsValid

func (s *Subscription) IsValid() bool

IsValid returns a boolean indicating whether the subscription is still active. This will return false if the subscription has already been closed.

func (*Subscription) MaxPending added in v1.2.0

func (s *Subscription) MaxPending() (int, int, error)

MaxPending returns the maximum number of queued messages and queued bytes seen so far.

func (*Subscription) NextMsg

func (s *Subscription) NextMsg(timeout time.Duration) (*Msg, error)

NextMsg will return the next message available to a synchronous subscriber or block until one is available. A timeout can be used to return when no message has been delivered.

nc, _ := nats.Connect(nats.DefaultURL)
defer nc.Close()

sub, _ := nc.SubscribeSync("foo")
m, err := sub.NextMsg(1 * time.Second)
if err == nil {
	fmt.Printf("Received a message: %s\n", string(m.Data))
} else {
	fmt.Println("NextMsg timed out.")

func (*Subscription) NextMsgWithContext added in v1.3.0

func (s *Subscription) NextMsgWithContext(ctx context.Context) (*Msg, error)

NextMsgWithContext takes a context and returns the next message available to a synchronous subscriber, blocking until it is delivered or context gets canceled.

func (*Subscription) Pending added in v1.2.0

func (s *Subscription) Pending() (int, int, error)

Pending returns the number of queued messages and queued bytes in the client for this subscription.

func (*Subscription) PendingLimits added in v1.2.0

func (s *Subscription) PendingLimits() (int, int, error)

PendingLimits returns the current limits for this subscription. If no error is returned, a negative value indicates that the given metric is not limited.

func (*Subscription) QueuedMsgs added in v1.1.2

func (s *Subscription) QueuedMsgs() (int, error)

Queued returns the number of queued messages in the client for this subscription. DEPRECATED: Use Pending()

func (*Subscription) SetPendingLimits added in v1.2.0

func (s *Subscription) SetPendingLimits(msgLimit, bytesLimit int) error

SetPendingLimits sets the limits for pending msgs and bytes for this subscription. Zero is not allowed. Any negative value means that the given metric is not limited.

func (*Subscription) Type added in v1.2.0

func (s *Subscription) Type() SubscriptionType

Type returns the type of Subscription.

func (*Subscription) Unsubscribe

func (s *Subscription) Unsubscribe() error

Unsubscribe will remove interest in the given subject.

nc, _ := nats.Connect(nats.DefaultURL)
defer nc.Close()

sub, _ := nc.SubscribeSync("foo")
// ...

type SubscriptionType added in v1.2.0

type SubscriptionType int

SubscriptionType is the type of the Subscription.


Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL