nats

package module
Version: v1.13.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 7, 2021 License: Apache-2.0 Imports: 36 Imported by: 1,367

README

NATS - Go Client

A Go client for the NATS messaging system.

License Apache 2 Go Report Card Build Status GoDoc Coverage Status

Installation

# Go client
go get github.com/nats-io/nats.go/

# Server
go get github.com/nats-io/nats-server

When using or transitioning to Go modules support:

# Go client latest or explicit version
go get github.com/nats-io/nats.go/@latest
go get github.com/nats-io/nats.go/@v1.13.0

# For latest NATS Server, add /v2 at the end
go get github.com/nats-io/nats-server/v2

# NATS Server v1 is installed otherwise
# go get github.com/nats-io/nats-server

Basic Usage

import "github.com/nats-io/nats.go"

// Connect to a server
nc, _ := nats.Connect(nats.DefaultURL)

// Simple Publisher
nc.Publish("foo", []byte("Hello World"))

// Simple Async Subscriber
nc.Subscribe("foo", func(m *nats.Msg) {
    fmt.Printf("Received a message: %s\n", string(m.Data))
})

// Responding to a request message
nc.Subscribe("request", func(m *nats.Msg) {
    m.Respond([]byte("answer is 42"))
})

// Simple Sync Subscriber
sub, err := nc.SubscribeSync("foo")
m, err := sub.NextMsg(timeout)

// Channel Subscriber
ch := make(chan *nats.Msg, 64)
sub, err := nc.ChanSubscribe("foo", ch)
msg := <- ch

// Unsubscribe
sub.Unsubscribe()

// Drain
sub.Drain()

// Requests
msg, err := nc.Request("help", []byte("help me"), 10*time.Millisecond)

// Replies
nc.Subscribe("help", func(m *nats.Msg) {
    nc.Publish(m.Reply, []byte("I can help!"))
})

// Drain connection (Preferred for responders)
// Close() not needed if this is called.
nc.Drain()

// Close connection
nc.Close()

JetStream Basic Usage

import "github.com/nats-io/nats.go"

// Connect to NATS
nc, _ := nats.Connect(nats.DefaultURL)

// Create JetStream Context
js, _ := nc.JetStream(nats.PublishAsyncMaxPending(256))

// Simple Stream Publisher
js.Publish("ORDERS.scratch", []byte("hello"))

// Simple Async Stream Publisher
for i := 0; i < 500; i++ {
	js.PublishAsync("ORDERS.scratch", []byte("hello"))
}
select {
case <-js.PublishAsyncComplete():
case <-time.After(5 * time.Second):
	fmt.Println("Did not resolve in time")
}

// Simple Async Ephemeral Consumer
js.Subscribe("ORDERS.*", func(m *nats.Msg) {
	fmt.Printf("Received a JetStream message: %s\n", string(m.Data))
})

// Simple Sync Durable Consumer (optional SubOpts at the end)
sub, err := js.SubscribeSync("ORDERS.*", nats.Durable("MONITOR"), nats.MaxDeliver(3))
m, err := sub.NextMsg(timeout)

// Simple Pull Consumer
sub, err := js.PullSubscribe("ORDERS.*", "MONITOR")
msgs, err := sub.Fetch(10)

// Unsubscribe
sub.Unsubscribe()

// Drain
sub.Drain()

JetStream Basic Management

import "github.com/nats-io/nats.go"

// Connect to NATS
nc, _ := nats.Connect(nats.DefaultURL)

// Create JetStream Context
js, _ := nc.JetStream()

// Create a Stream
js.AddStream(&nats.StreamConfig{
	Name:     "ORDERS",
	Subjects: []string{"ORDERS.*"},
})

// Update a Stream
js.UpdateStream(&nats.StreamConfig{
	Name:     "ORDERS",
	MaxBytes: 8,
})

// Create a Consumer
js.AddConsumer("ORDERS", &nats.ConsumerConfig{
	Durable: "MONITOR",
})

// Delete Consumer
js.DeleteConsumer("ORDERS", "MONITOR")

// Delete Stream
js.DeleteStream("ORDERS")

Encoded Connections


nc, _ := nats.Connect(nats.DefaultURL)
c, _ := nats.NewEncodedConn(nc, nats.JSON_ENCODER)
defer c.Close()

// Simple Publisher
c.Publish("foo", "Hello World")

// Simple Async Subscriber
c.Subscribe("foo", func(s string) {
    fmt.Printf("Received a message: %s\n", s)
})

// EncodedConn can Publish any raw Go type using the registered Encoder
type person struct {
     Name     string
     Address  string
     Age      int
}

// Go type Subscriber
c.Subscribe("hello", func(p *person) {
    fmt.Printf("Received a person: %+v\n", p)
})

me := &person{Name: "derek", Age: 22, Address: "140 New Montgomery Street, San Francisco, CA"}

// Go type Publisher
c.Publish("hello", me)

// Unsubscribe
sub, err := c.Subscribe("foo", nil)
// ...
sub.Unsubscribe()

// Requests
var response string
err = c.Request("help", "help me", &response, 10*time.Millisecond)
if err != nil {
    fmt.Printf("Request failed: %v\n", err)
}

// Replying
c.Subscribe("help", func(subj, reply string, msg string) {
    c.Publish(reply, "I can help!")
})

// Close connection
c.Close();

New Authentication (Nkeys and User Credentials)

This requires server with version >= 2.0.0

NATS servers have a new security and authentication mechanism to authenticate with user credentials and Nkeys. The simplest form is to use the helper method UserCredentials(credsFilepath).

nc, err := nats.Connect(url, nats.UserCredentials("user.creds"))

The helper methods creates two callback handlers to present the user JWT and sign the nonce challenge from the server. The core client library never has direct access to your private key and simply performs the callback for signing the server challenge. The helper will load and wipe and erase memory it uses for each connect or reconnect.

The helper also can take two entries, one for the JWT and one for the NKey seed file.

nc, err := nats.Connect(url, nats.UserCredentials("user.jwt", "user.nk"))

You can also set the callback handlers directly and manage challenge signing directly.

nc, err := nats.Connect(url, nats.UserJWT(jwtCB, sigCB))

Bare Nkeys are also supported. The nkey seed should be in a read only file, e.g. seed.txt

> cat seed.txt
# This is my seed nkey!
SUAGMJH5XLGZKQQWAWKRZJIGMOU4HPFUYLXJMXOO5NLFEO2OOQJ5LPRDPM

This is a helper function which will load and decode and do the proper signing for the server nonce. It will clear memory in between invocations. You can choose to use the low level option and provide the public key and a signature callback on your own.

opt, err := nats.NkeyOptionFromSeed("seed.txt")
nc, err := nats.Connect(serverUrl, opt)

// Direct
nc, err := nats.Connect(serverUrl, nats.Nkey(pubNkey, sigCB))

TLS

// tls as a scheme will enable secure connections by default. This will also verify the server name.
nc, err := nats.Connect("tls://nats.demo.io:4443")

// If you are using a self-signed certificate, you need to have a tls.Config with RootCAs setup.
// We provide a helper method to make this case easier.
nc, err = nats.Connect("tls://localhost:4443", nats.RootCAs("./configs/certs/ca.pem"))

// If the server requires client certificate, there is an helper function for that too:
cert := nats.ClientCert("./configs/certs/client-cert.pem", "./configs/certs/client-key.pem")
nc, err = nats.Connect("tls://localhost:4443", cert)

// You can also supply a complete tls.Config

certFile := "./configs/certs/client-cert.pem"
keyFile := "./configs/certs/client-key.pem"
cert, err := tls.LoadX509KeyPair(certFile, keyFile)
if err != nil {
    t.Fatalf("error parsing X509 certificate/key pair: %v", err)
}

config := &tls.Config{
    ServerName: 	opts.Host,
    Certificates: 	[]tls.Certificate{cert},
    RootCAs:    	pool,
    MinVersion: 	tls.VersionTLS12,
}

nc, err = nats.Connect("nats://localhost:4443", nats.Secure(config))
if err != nil {
	t.Fatalf("Got an error on Connect with Secure Options: %+v\n", err)
}

Using Go Channels (netchan)

nc, _ := nats.Connect(nats.DefaultURL)
ec, _ := nats.NewEncodedConn(nc, nats.JSON_ENCODER)
defer ec.Close()

type person struct {
     Name     string
     Address  string
     Age      int
}

recvCh := make(chan *person)
ec.BindRecvChan("hello", recvCh)

sendCh := make(chan *person)
ec.BindSendChan("hello", sendCh)

me := &person{Name: "derek", Age: 22, Address: "140 New Montgomery Street"}

// Send via Go channels
sendCh <- me

// Receive via Go channels
who := <- recvCh

Wildcard Subscriptions


// "*" matches any token, at any level of the subject.
nc.Subscribe("foo.*.baz", func(m *Msg) {
    fmt.Printf("Msg received on [%s] : %s\n", m.Subject, string(m.Data));
})

nc.Subscribe("foo.bar.*", func(m *Msg) {
    fmt.Printf("Msg received on [%s] : %s\n", m.Subject, string(m.Data));
})

// ">" matches any length of the tail of a subject, and can only be the last token
// E.g. 'foo.>' will match 'foo.bar', 'foo.bar.baz', 'foo.foo.bar.bax.22'
nc.Subscribe("foo.>", func(m *Msg) {
    fmt.Printf("Msg received on [%s] : %s\n", m.Subject, string(m.Data));
})

// Matches all of the above
nc.Publish("foo.bar.baz", []byte("Hello World"))

Queue Groups

// All subscriptions with the same queue name will form a queue group.
// Each message will be delivered to only one subscriber per queue group,
// using queuing semantics. You can have as many queue groups as you wish.
// Normal subscribers will continue to work as expected.

nc.QueueSubscribe("foo", "job_workers", func(_ *Msg) {
  received += 1;
})

Advanced Usage


// Normally, the library will return an error when trying to connect and
// there is no server running. The RetryOnFailedConnect option will set
// the connection in reconnecting state if it failed to connect right away.
nc, err := nats.Connect(nats.DefaultURL,
    nats.RetryOnFailedConnect(true),
    nats.MaxReconnects(10),
    nats.ReconnectWait(time.Second),
    nats.ReconnectHandler(func(_ *nats.Conn) {
        // Note that this will be invoked for the first asynchronous connect.
    }))
if err != nil {
    // Should not return an error even if it can't connect, but you still
    // need to check in case there are some configuration errors.
}

// Flush connection to server, returns when all messages have been processed.
nc.Flush()
fmt.Println("All clear!")

// FlushTimeout specifies a timeout value as well.
err := nc.FlushTimeout(1*time.Second)
if err != nil {
    fmt.Println("All clear!")
} else {
    fmt.Println("Flushed timed out!")
}

// Auto-unsubscribe after MAX_WANTED messages received
const MAX_WANTED = 10
sub, err := nc.Subscribe("foo")
sub.AutoUnsubscribe(MAX_WANTED)

// Multiple connections
nc1 := nats.Connect("nats://host1:4222")
nc2 := nats.Connect("nats://host2:4222")

nc1.Subscribe("foo", func(m *Msg) {
    fmt.Printf("Received a message: %s\n", string(m.Data))
})

nc2.Publish("foo", []byte("Hello World!"));

Clustered Usage


var servers = "nats://localhost:1222, nats://localhost:1223, nats://localhost:1224"

nc, err := nats.Connect(servers)

// Optionally set ReconnectWait and MaxReconnect attempts.
// This example means 10 seconds total per backend.
nc, err = nats.Connect(servers, nats.MaxReconnects(5), nats.ReconnectWait(2 * time.Second))

// You can also add some jitter for the reconnection.
// This call will add up to 500 milliseconds for non TLS connections and 2 seconds for TLS connections.
// If not specified, the library defaults to 100 milliseconds and 1 second, respectively.
nc, err = nats.Connect(servers, nats.ReconnectJitter(500*time.Millisecond, 2*time.Second))

// You can also specify a custom reconnect delay handler. If set, the library will invoke it when it has tried
// all URLs in its list. The value returned will be used as the total sleep time, so add your own jitter.
// The library will pass the number of times it went through the whole list.
nc, err = nats.Connect(servers, nats.CustomReconnectDelay(func(attempts int) time.Duration {
    return someBackoffFunction(attempts)
}))

// Optionally disable randomization of the server pool
nc, err = nats.Connect(servers, nats.DontRandomize())

// Setup callbacks to be notified on disconnects, reconnects and connection closed.
nc, err = nats.Connect(servers,
	nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
		fmt.Printf("Got disconnected! Reason: %q\n", err)
	}),
	nats.ReconnectHandler(func(nc *nats.Conn) {
		fmt.Printf("Got reconnected to %v!\n", nc.ConnectedUrl())
	}),
	nats.ClosedHandler(func(nc *nats.Conn) {
		fmt.Printf("Connection closed. Reason: %q\n", nc.LastError())
	})
)

// When connecting to a mesh of servers with auto-discovery capabilities,
// you may need to provide a username/password or token in order to connect
// to any server in that mesh when authentication is required.
// Instead of providing the credentials in the initial URL, you will use
// new option setters:
nc, err = nats.Connect("nats://localhost:4222", nats.UserInfo("foo", "bar"))

// For token based authentication:
nc, err = nats.Connect("nats://localhost:4222", nats.Token("S3cretT0ken"))

// You can even pass the two at the same time in case one of the server
// in the mesh requires token instead of user name and password.
nc, err = nats.Connect("nats://localhost:4222",
    nats.UserInfo("foo", "bar"),
    nats.Token("S3cretT0ken"))

// Note that if credentials are specified in the initial URLs, they take
// precedence on the credentials specified through the options.
// For instance, in the connect call below, the client library will use
// the user "my" and password "pwd" to connect to localhost:4222, however,
// it will use username "foo" and password "bar" when (re)connecting to
// a different server URL that it got as part of the auto-discovery.
nc, err = nats.Connect("nats://my:pwd@localhost:4222", nats.UserInfo("foo", "bar"))

Context support (+Go 1.7)

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

nc, err := nats.Connect(nats.DefaultURL)

// Request with context
msg, err := nc.RequestWithContext(ctx, "foo", []byte("bar"))

// Synchronous subscriber with context
sub, err := nc.SubscribeSync("foo")
msg, err := sub.NextMsgWithContext(ctx)

// Encoded Request with context
c, err := nats.NewEncodedConn(nc, nats.JSON_ENCODER)
type request struct {
	Message string `json:"message"`
}
type response struct {
	Code int `json:"code"`
}
req := &request{Message: "Hello"}
resp := &response{}
err := c.RequestWithContext(ctx, "foo", req, resp)

License

Unless otherwise noted, the NATS source files are distributed under the Apache Version 2.0 license found in the LICENSE file.

FOSSA Status

Documentation

Overview

A Go client for the NATS messaging system (https://nats.io).

Index

Examples

Constants

View Source
const (
	JSON_ENCODER    = "json"
	GOB_ENCODER     = "gob"
	DEFAULT_ENCODER = "default"
)

Indexed names into the Registered Encoders.

View Source
const (
	MsgIdHdr               = "Nats-Msg-Id"
	ExpectedStreamHdr      = "Nats-Expected-Stream"
	ExpectedLastSeqHdr     = "Nats-Expected-Last-Sequence"
	ExpectedLastSubjSeqHdr = "Nats-Expected-Last-Subject-Sequence"
	ExpectedLastMsgIdHdr   = "Nats-Expected-Last-Msg-Id"
	MsgRollup              = "Nats-Rollup"
)

Headers for published messages.

View Source
const (
	MsgRollupSubject = "sub"
	MsgRollupAll     = "all"
)

Rollups, can be subject only or all messages.

View Source
const (
	KeyValueMaxHistory = 64
	AllKeys            = ">"
)

Used to watch all keys.

View Source
const (
	Version                   = "1.13.0"
	DefaultURL                = "nats://127.0.0.1:4222"
	DefaultPort               = 4222
	DefaultMaxReconnect       = 60
	DefaultReconnectWait      = 2 * time.Second
	DefaultReconnectJitter    = 100 * time.Millisecond
	DefaultReconnectJitterTLS = time.Second
	DefaultTimeout            = 2 * time.Second
	DefaultPingInterval       = 2 * time.Minute
	DefaultMaxPingOut         = 2
	DefaultMaxChanLen         = 64 * 1024       // 64k
	DefaultReconnectBufSize   = 8 * 1024 * 1024 // 8MB
	RequestChanLen            = 8
	DefaultDrainTimeout       = 30 * time.Second
	LangString                = "go"
)

Default Constants

View Source
const (
	// STALE_CONNECTION is for detection and proper handling of stale connections.
	STALE_CONNECTION = "stale connection"

	// PERMISSIONS_ERR is for when nats server subject authorization has failed.
	PERMISSIONS_ERR = "permissions violation"

	// AUTHORIZATION_ERR is for when nats server user authorization has failed.
	AUTHORIZATION_ERR = "authorization violation"

	// AUTHENTICATION_EXPIRED_ERR is for when nats server user authorization has expired.
	AUTHENTICATION_EXPIRED_ERR = "user authentication expired"

	// AUTHENTICATION_REVOKED_ERR is for when user authorization has been revoked.
	AUTHENTICATION_REVOKED_ERR = "user authentication revoked"

	// ACCOUNT_AUTHENTICATION_EXPIRED_ERR is for when nats server account authorization has expired.
	ACCOUNT_AUTHENTICATION_EXPIRED_ERR = "account authentication expired"
)
View Source
const (
	DISCONNECTED = Status(iota)
	CONNECTED
	CLOSED
	RECONNECTING
	CONNECTING
	DRAINING_SUBS
	DRAINING_PUBS
)
View Source
const (
	AsyncSubscription = SubscriptionType(iota)
	SyncSubscription
	ChanSubscription
	NilSubscription
	PullSubscription
)

The different types of subscription types.

View Source
const (
	// DefaultSubPendingMsgsLimit will be 512k msgs.
	DefaultSubPendingMsgsLimit = 512 * 1024
	// DefaultSubPendingBytesLimit is 64MB
	DefaultSubPendingBytesLimit = 64 * 1024 * 1024
)

Pending Limits

View Source
const (
	OP_START = iota
	OP_PLUS
	OP_PLUS_O
	OP_PLUS_OK
	OP_MINUS
	OP_MINUS_E
	OP_MINUS_ER
	OP_MINUS_ERR
	OP_MINUS_ERR_SPC
	MINUS_ERR_ARG
	OP_M
	OP_MS
	OP_MSG
	OP_MSG_SPC
	MSG_ARG
	MSG_PAYLOAD
	MSG_END
	OP_H
	OP_P
	OP_PI
	OP_PIN
	OP_PING
	OP_PO
	OP_PON
	OP_PONG
	OP_I
	OP_IN
	OP_INF
	OP_INFO
	OP_INFO_SPC
	INFO_ARG
)
View Source
const (
	InboxPrefix = "_INBOX."
)

InboxPrefix is the prefix for all inbox subjects.

View Source
const MAX_CONTROL_LINE_SIZE = 4096
View Source
const MsgSize = "Nats-Msg-Size"

MsgSize is a header that will be part of a consumer's delivered message if HeadersOnly requested.

Variables

View Source
var (
	ErrKeyValueConfigRequired = errors.New("nats: config required")
	ErrInvalidBucketName      = errors.New("nats: invalid bucket name")
	ErrInvalidKey             = errors.New("nats: invalid key")
	ErrBucketNotFound         = errors.New("nats: bucket not found")
	ErrBadBucket              = errors.New("nats: bucket not valid key-value store")
	ErrKeyNotFound            = errors.New("nats: key not found")
	ErrKeyDeleted             = errors.New("nats: key was deleted")
	ErrHistoryToLarge         = errors.New("nats: history limited to a max of 64")
	ErrNoKeysFound            = errors.New("nats: no keys found")
)

Errors

View Source
var (
	ErrConnectionClosed             = errors.New("nats: connection closed")
	ErrConnectionDraining           = errors.New("nats: connection draining")
	ErrDrainTimeout                 = errors.New("nats: draining connection timed out")
	ErrConnectionReconnecting       = errors.New("nats: connection reconnecting")
	ErrSecureConnRequired           = errors.New("nats: secure connection required")
	ErrSecureConnWanted             = errors.New("nats: secure connection not available")
	ErrBadSubscription              = errors.New("nats: invalid subscription")
	ErrTypeSubscription             = errors.New("nats: invalid subscription type")
	ErrBadSubject                   = errors.New("nats: invalid subject")
	ErrBadQueueName                 = errors.New("nats: invalid queue name")
	ErrSlowConsumer                 = errors.New("nats: slow consumer, messages dropped")
	ErrTimeout                      = errors.New("nats: timeout")
	ErrBadTimeout                   = errors.New("nats: timeout invalid")
	ErrAuthorization                = errors.New("nats: authorization violation")
	ErrAuthExpired                  = errors.New("nats: authentication expired")
	ErrAuthRevoked                  = errors.New("nats: authentication revoked")
	ErrAccountAuthExpired           = errors.New("nats: account authentication expired")
	ErrNoServers                    = errors.New("nats: no servers available for connection")
	ErrJsonParse                    = errors.New("nats: connect message, json parse error")
	ErrChanArg                      = errors.New("nats: argument needs to be a channel type")
	ErrMaxPayload                   = errors.New("nats: maximum payload exceeded")
	ErrMaxMessages                  = errors.New("nats: maximum messages delivered")
	ErrSyncSubRequired              = errors.New("nats: illegal call on an async subscription")
	ErrMultipleTLSConfigs           = errors.New("nats: multiple tls.Configs not allowed")
	ErrNoInfoReceived               = errors.New("nats: protocol exception, INFO not received")
	ErrReconnectBufExceeded         = errors.New("nats: outbound buffer limit exceeded")
	ErrInvalidConnection            = errors.New("nats: invalid connection")
	ErrInvalidMsg                   = errors.New("nats: invalid message or message nil")
	ErrInvalidArg                   = errors.New("nats: invalid argument")
	ErrInvalidContext               = errors.New("nats: invalid context")
	ErrNoDeadlineContext            = errors.New("nats: context requires a deadline")
	ErrNoEchoNotSupported           = errors.New("nats: no echo option not supported by this server")
	ErrClientIDNotSupported         = errors.New("nats: client ID not supported by this server")
	ErrUserButNoSigCB               = errors.New("nats: user callback defined without a signature handler")
	ErrNkeyButNoSigCB               = errors.New("nats: nkey defined without a signature handler")
	ErrNoUserCB                     = errors.New("nats: user callback not defined")
	ErrNkeyAndUser                  = errors.New("nats: user callback and nkey defined")
	ErrNkeysNotSupported            = errors.New("nats: nkeys not supported by the server")
	ErrStaleConnection              = errors.New("nats: " + STALE_CONNECTION)
	ErrTokenAlreadySet              = errors.New("nats: token and token handler both set")
	ErrMsgNotBound                  = errors.New("nats: message is not bound to subscription/connection")
	ErrMsgNoReply                   = errors.New("nats: message does not have a reply")
	ErrClientIPNotSupported         = errors.New("nats: client IP not supported by this server")
	ErrDisconnected                 = errors.New("nats: server is disconnected")
	ErrHeadersNotSupported          = errors.New("nats: headers not supported by this server")
	ErrBadHeaderMsg                 = errors.New("nats: message could not decode headers")
	ErrNoResponders                 = errors.New("nats: no responders available for request")
	ErrNoContextOrTimeout           = errors.New("nats: no context or timeout given")
	ErrPullModeNotAllowed           = errors.New("nats: pull based not supported")
	ErrJetStreamNotEnabled          = errors.New("nats: jetstream not enabled")
	ErrJetStreamBadPre              = errors.New("nats: jetstream api prefix not valid")
	ErrNoStreamResponse             = errors.New("nats: no response from stream")
	ErrNotJSMessage                 = errors.New("nats: not a jetstream message")
	ErrInvalidStreamName            = errors.New("nats: invalid stream name")
	ErrInvalidDurableName           = errors.New("nats: invalid durable name")
	ErrNoMatchingStream             = errors.New("nats: no stream matches subject")
	ErrSubjectMismatch              = errors.New("nats: subject does not match consumer")
	ErrContextAndTimeout            = errors.New("nats: context and timeout can not both be set")
	ErrInvalidJSAck                 = errors.New("nats: invalid jetstream publish response")
	ErrMultiStreamUnsupported       = errors.New("nats: multiple streams are not supported")
	ErrStreamNameRequired           = errors.New("nats: stream name is required")
	ErrStreamNotFound               = errors.New("nats: stream not found")
	ErrConsumerNotFound             = errors.New("nats: consumer not found")
	ErrConsumerNameRequired         = errors.New("nats: consumer name is required")
	ErrConsumerConfigRequired       = errors.New("nats: consumer configuration is required")
	ErrStreamSnapshotConfigRequired = errors.New("nats: stream snapshot configuration is required")
	ErrDeliverSubjectRequired       = errors.New("nats: deliver subject is required")
	ErrPullSubscribeToPushConsumer  = errors.New("nats: cannot pull subscribe to push based consumer")
	ErrPullSubscribeRequired        = errors.New("nats: must use pull subscribe to bind to pull based consumer")
	ErrConsumerNotActive            = errors.New("nats: consumer not active")
	ErrMsgNotFound                  = errors.New("nats: message not found")
)

Errors

View Source
var (
	ErrObjectConfigRequired = errors.New("nats: object-store config required")
	ErrBadObjectMeta        = errors.New("nats: object-store meta information invalid")
	ErrObjectNotFound       = errors.New("nats: object not found")
	ErrInvalidStoreName     = errors.New("nats: invalid object-store name")
	ErrInvalidObjectName    = errors.New("nats: invalid object name")
	ErrDigestMismatch       = errors.New("nats: received a corrupt object, digests do not match")
	ErrNoObjectsFound       = errors.New("nats: no objects found")
)
View Source
var DefaultOptions = GetDefaultOptions()

DEPRECATED: Use GetDefaultOptions() instead. DefaultOptions is not safe for use by multiple clients. For details see #308.

Functions

func NewInbox

func NewInbox() string

NewInbox will return an inbox string which can be used for directed replies from subscribers. These are guaranteed to be unique, but can be shared and subscribed to by others.

func RegisterEncoder

func RegisterEncoder(encType string, enc Encoder)

RegisterEncoder will register the encType with the given Encoder. Useful for customization.

Types

type APIStats added in v1.11.0

type APIStats struct {
	Total  uint64 `json:"total"`
	Errors uint64 `json:"errors"`
}

APIStats reports on API calls to JetStream for this account.

type AccountInfo added in v1.11.0

type AccountInfo struct {
	Memory    uint64        `json:"memory"`
	Store     uint64        `json:"storage"`
	Streams   int           `json:"streams"`
	Consumers int           `json:"consumers"`
	Domain    string        `json:"domain"`
	API       APIStats      `json:"api"`
	Limits    AccountLimits `json:"limits"`
}

AccountInfo contains info about the JetStream usage from the current account.

type AccountLimits added in v1.11.0

type AccountLimits struct {
	MaxMemory    int64 `json:"max_memory"`
	MaxStore     int64 `json:"max_storage"`
	MaxStreams   int   `json:"max_streams"`
	MaxConsumers int   `json:"max_consumers"`
}

AccountLimits includes the JetStream limits of the current account.

type AckOpt added in v1.11.0

type AckOpt interface {
	// contains filtered or unexported methods
}

AckOpt are the options that can be passed when acknowledge a message.

Example

AckOpt are the options that can be passed when acknowledge a message.

nc, err := nats.Connect("localhost")
if err != nil {
	log.Fatal(err)
}

// Create JetStream context to produce/consumer messages that will be persisted.
js, err := nc.JetStream()
if err != nil {
	log.Fatal(err)
}

// Create stream to persist messages published on 'foo'.
js.AddStream(&nats.StreamConfig{
	Name:     "FOO",
	Subjects: []string{"foo"},
})

// Publish is synchronous by default, and waits for a PubAck response.
js.Publish("foo", []byte("Hello JS!"))

sub, _ := js.SubscribeSync("foo")
msg, _ := sub.NextMsg(2 * time.Second)

// Ack and wait for 2 seconds
msg.InProgress(nats.AckWait(2))

// Using a context.
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
msg.Ack(nats.Context(ctx))
Output:

type AckPolicy added in v1.11.0

type AckPolicy int

AckPolicy determines how the consumer should acknowledge delivered messages.

const (
	// AckNonePolicy requires no acks for delivered messages.
	AckNonePolicy AckPolicy = iota

	// AckAllPolicy when acking a sequence number, this implicitly acks all
	// sequences below this one as well.
	AckAllPolicy

	// AckExplicitPolicy requires ack or nack for all messages.
	AckExplicitPolicy
)

func (AckPolicy) MarshalJSON added in v1.11.0

func (p AckPolicy) MarshalJSON() ([]byte, error)

func (AckPolicy) String added in v1.11.0

func (p AckPolicy) String() string

func (*AckPolicy) UnmarshalJSON added in v1.11.0

func (p *AckPolicy) UnmarshalJSON(data []byte) error

type AckWait added in v1.11.0

type AckWait time.Duration

AckWait sets the maximum amount of time we will wait for an ack.

Example
nc, _ := nats.Connect("localhost")
js, _ := nc.JetStream()

// Set custom timeout for a JetStream API request.
js.AddStream(&nats.StreamConfig{
	Name:     "FOO",
	Subjects: []string{"foo"},
})

// Wait for an ack response for 2 seconds.
js.Publish("foo", []byte("Hello JS!"), nats.AckWait(2*time.Second))

// Create consumer on 'foo' subject that waits for an ack for 10s,
// after which the message will be delivered.
sub, _ := js.SubscribeSync("foo", nats.AckWait(10*time.Second))
msg, _ := sub.NextMsg(2 * time.Second)

// Wait for ack of ack for 2s.
msg.AckSync(nats.AckWait(2 * time.Second))
Output:

type AuthTokenHandler added in v1.7.0

type AuthTokenHandler func() string

AuthTokenHandler is used to generate a new token.

type ClusterInfo added in v1.11.0

type ClusterInfo struct {
	Name     string      `json:"name,omitempty"`
	Leader   string      `json:"leader,omitempty"`
	Replicas []*PeerInfo `json:"replicas,omitempty"`
}

ClusterInfo shows information about the underlying set of servers that make up the stream or consumer.

type Conn

type Conn struct {
	// Keep all members for which we use atomic at the beginning of the
	// struct and make sure they are all 64bits (or use padding if necessary).
	// atomic.* functions crash on 32bit machines if operand is not aligned
	// at 64bit. See https://github.com/golang/go/issues/599
	Statistics

	// Opts holds the configuration of the Conn.
	// Modifying the configuration of a running Conn is a race.
	Opts Options
	// contains filtered or unexported fields
}

A Conn represents a bare connection to a nats-server. It can send and receive []byte payloads. The connection is safe to use in multiple Go routines concurrently.

func Connect

func Connect(url string, options ...Option) (*Conn, error)

Connect will attempt to connect to the NATS system. The url can contain username/password semantics. e.g. nats://derek:pass@localhost:4222 Comma separated arrays are also supported, e.g. urlA, urlB. Options start with the defaults but can be overridden. To connect to a NATS Server's websocket port, use the `ws` or `wss` scheme, such as `ws://localhost:8080`. Note that websocket schemes cannot be mixed with others (nats/tls).

Example

Shows different ways to create a Conn.

nc, _ := nats.Connect("demo.nats.io")
nc.Close()

nc, _ = nats.Connect("nats://derek:secretpassword@demo.nats.io:4222")
nc.Close()

nc, _ = nats.Connect("tls://derek:secretpassword@demo.nats.io:4443")
nc.Close()

opts := nats.Options{
	AllowReconnect: true,
	MaxReconnect:   10,
	ReconnectWait:  5 * time.Second,
	Timeout:        1 * time.Second,
}

nc, _ = opts.Connect()
nc.Close()
Output:

func (*Conn) AuthRequired added in v1.2.0

func (nc *Conn) AuthRequired() bool

AuthRequired will return if the connected server requires authorization.

func (*Conn) Barrier added in v1.5.0

func (nc *Conn) Barrier(f func()) error

Barrier schedules the given function `f` to all registered asynchronous subscriptions. Only the last subscription to see this barrier will invoke the function. If no subscription is registered at the time of this call, `f()` is invoked right away. ErrConnectionClosed is returned if the connection is closed prior to the call.

func (*Conn) Buffered added in v1.1.2

func (nc *Conn) Buffered() (int, error)

Buffered will return the number of bytes buffered to be sent to the server. FIXME(dlc) take into account disconnected state.

func (*Conn) ChanQueueSubscribe added in v1.2.0

func (nc *Conn) ChanQueueSubscribe(subj, group string, ch chan *Msg) (*Subscription, error)

ChanQueueSubscribe will express interest in the given subject. All subscribers with the same queue name will form the queue group and only one member of the group will be selected to receive any given message, which will be placed on the channel. You should not close the channel until sub.Unsubscribe() has been called. Note: This is the same than QueueSubscribeSyncWithChan.

func (*Conn) ChanSubscribe added in v1.2.0

func (nc *Conn) ChanSubscribe(subj string, ch chan *Msg) (*Subscription, error)

ChanSubscribe will express interest in the given subject and place all messages received on the channel. You should not close the channel until sub.Unsubscribe() has been called.

func (*Conn) Close

func (nc *Conn) Close()

Close will close the connection to the server. This call will release all blocking calls, such as Flush() and NextMsg()

Example
nc, _ := nats.Connect(nats.DefaultURL)
nc.Close()
Output:

func (*Conn) ConnectedAddr added in v1.7.2

func (nc *Conn) ConnectedAddr() string

ConnectedAddr returns the connected server's IP

func (*Conn) ConnectedClusterName added in v1.11.0

func (nc *Conn) ConnectedClusterName() string

ConnectedClusterName reports the connected server's cluster name if any

func (*Conn) ConnectedServerId added in v1.0.5

func (nc *Conn) ConnectedServerId() string

ConnectedServerId reports the connected server's Id

func (*Conn) ConnectedServerName added in v1.11.0

func (nc *Conn) ConnectedServerName() string

ConnectedServerName reports the connected server's name

func (*Conn) ConnectedServerVersion added in v1.13.0

func (nc *Conn) ConnectedServerVersion() string

ConnectedServerVersion reports the connected server's version as a string

func (*Conn) ConnectedUrl

func (nc *Conn) ConnectedUrl() string

ConnectedUrl reports the connected server's URL

func (*Conn) DiscoveredServers added in v1.2.2

func (nc *Conn) DiscoveredServers() []string

DiscoveredServers returns only the server urls that have been discovered after a connection has been established. If authentication is enabled, use UserInfo or Token when connecting with these urls.

func (*Conn) Drain added in v1.6.0

func (nc *Conn) Drain() error

Drain will put a connection into a drain state. All subscriptions will immediately be put into a drain state. Upon completion, the publishers will be drained and can not publish any additional messages. Upon draining of the publishers, the connection will be closed. Use the ClosedCB() option to know when the connection has moved from draining to closed.

See note in Subscription.Drain for JetStream subscriptions.

func (*Conn) Flush

func (nc *Conn) Flush() error

Flush will perform a round trip to the server and return when it receives the internal reply.

Example
nc, _ := nats.Connect(nats.DefaultURL)
defer nc.Close()

msg := &nats.Msg{Subject: "foo", Reply: "bar", Data: []byte("Hello World!")}
for i := 0; i < 1000; i++ {
	nc.PublishMsg(msg)
}
err := nc.Flush()
if err == nil {
	// Everything has been processed by the server for nc *Conn.
}
Output:

func (*Conn) FlushTimeout

func (nc *Conn) FlushTimeout(timeout time.Duration) (err error)

FlushTimeout allows a Flush operation to have an associated timeout.

Example
nc, _ := nats.Connect(nats.DefaultURL)
defer nc.Close()

msg := &nats.Msg{Subject: "foo", Reply: "bar", Data: []byte("Hello World!")}
for i := 0; i < 1000; i++ {
	nc.PublishMsg(msg)
}
// Only wait for up to 1 second for Flush
err := nc.FlushTimeout(1 * time.Second)
if err == nil {
	// Everything has been processed by the server for nc *Conn.
}
Output:

func (*Conn) FlushWithContext added in v1.7.2

func (nc *Conn) FlushWithContext(ctx context.Context) error

FlushWithContext will allow a context to control the duration of a Flush() call. This context should be non-nil and should have a deadline set. We will return an error if none is present.

func (*Conn) GetClientID added in v1.7.0

func (nc *Conn) GetClientID() (uint64, error)

GetClientID returns the client ID assigned by the server to which the client is currently connected to. Note that the value may change if the client reconnects. This function returns ErrClientIDNotSupported if the server is of a version prior to 1.2.0.

func (*Conn) GetClientIP added in v1.10.0

func (nc *Conn) GetClientIP() (net.IP, error)

GetClientIP returns the client IP as known by the server. Supported as of server version 2.1.6.

func (*Conn) HeadersSupported added in v1.11.0

func (nc *Conn) HeadersSupported() bool

HeadersSupported will return if the server supports headers

func (*Conn) IsClosed

func (nc *Conn) IsClosed() bool

IsClosed tests if a Conn has been closed.

func (*Conn) IsConnected added in v1.2.2

func (nc *Conn) IsConnected() bool

IsConnected tests if a Conn is connected.

func (*Conn) IsDraining added in v1.6.0

func (nc *Conn) IsDraining() bool

IsDraining tests if a Conn is in the draining state.

func (*Conn) IsReconnecting

func (nc *Conn) IsReconnecting() bool

IsReconnecting tests if a Conn is reconnecting.

func (*Conn) JetStream added in v1.11.0

func (nc *Conn) JetStream(opts ...JSOpt) (JetStreamContext, error)

JetStream returns a JetStreamContext for messaging and stream management. Errors are only returned if inconsistent options are provided.

func (*Conn) LastError

func (nc *Conn) LastError() error

LastError reports the last error encountered via the connection. It can be used reliably within ClosedCB in order to find out reason why connection was closed for example.

func (*Conn) MaxPayload added in v1.1.2

func (nc *Conn) MaxPayload() int64

MaxPayload returns the size limit that a message payload can have. This is set by the server configuration and delivered to the client upon connect.

func (*Conn) NewRespInbox added in v1.7.0

func (nc *Conn) NewRespInbox() string

NewRespInbox is the new format used for _INBOX.

func (*Conn) NumSubscriptions added in v1.6.0

func (nc *Conn) NumSubscriptions() int

NumSubscriptions returns active number of subscriptions.

func (*Conn) Publish

func (nc *Conn) Publish(subj string, data []byte) error

Publish publishes the data argument to the given subject. The data argument is left untouched and needs to be correctly interpreted on the receiver.

Example
nc, _ := nats.Connect(nats.DefaultURL)
defer nc.Close()

nc.Publish("foo", []byte("Hello World!"))
Output:

func (*Conn) PublishMsg

func (nc *Conn) PublishMsg(m *Msg) error

PublishMsg publishes the Msg structure, which includes the Subject, an optional Reply and an optional Data field.

Example
nc, _ := nats.Connect(nats.DefaultURL)
defer nc.Close()

msg := &nats.Msg{Subject: "foo", Reply: "bar", Data: []byte("Hello World!")}
nc.PublishMsg(msg)
Output:

func (*Conn) PublishRequest

func (nc *Conn) PublishRequest(subj, reply string, data []byte) error

PublishRequest will perform a Publish() expecting a response on the reply subject. Use Request() for automatically waiting for a response inline.

func (*Conn) QueueSubscribe

func (nc *Conn) QueueSubscribe(subj, queue string, cb MsgHandler) (*Subscription, error)

QueueSubscribe creates an asynchronous queue subscriber on the given subject. All subscribers with the same queue name will form the queue group and only one member of the group will be selected to receive any given message asynchronously.

Example
nc, _ := nats.Connect(nats.DefaultURL)
defer nc.Close()

received := 0

nc.QueueSubscribe("foo", "worker_group", func(_ *nats.Msg) {
	received++
})
Output:

func (*Conn) QueueSubscribeSync

func (nc *Conn) QueueSubscribeSync(subj, queue string) (*Subscription, error)

QueueSubscribeSync creates a synchronous queue subscriber on the given subject. All subscribers with the same queue name will form the queue group and only one member of the group will be selected to receive any given message synchronously using Subscription.NextMsg().

func (*Conn) QueueSubscribeSyncWithChan added in v1.2.0

func (nc *Conn) QueueSubscribeSyncWithChan(subj, queue string, ch chan *Msg) (*Subscription, error)

QueueSubscribeSyncWithChan will express interest in the given subject. All subscribers with the same queue name will form the queue group and only one member of the group will be selected to receive any given message, which will be placed on the channel. You should not close the channel until sub.Unsubscribe() has been called. Note: This is the same than ChanQueueSubscribe.

func (*Conn) RTT added in v1.10.0

func (nc *Conn) RTT() (time.Duration, error)

RTT calculates the round trip time between this client and the server.

func (*Conn) Request

func (nc *Conn) Request(subj string, data []byte, timeout time.Duration) (*Msg, error)

Request will send a request payload and deliver the response message, or an error, including a timeout if no message was received properly.

Example
nc, _ := nats.Connect(nats.DefaultURL)
defer nc.Close()

nc.Subscribe("foo", func(m *nats.Msg) {
	nc.Publish(m.Reply, []byte("I will help you"))
})
nc.Request("foo", []byte("help"), 50*time.Millisecond)
Output:

func (*Conn) RequestMsg added in v1.11.0

func (nc *Conn) RequestMsg(msg *Msg, timeout time.Duration) (*Msg, error)

RequestMsg will send a request payload including optional headers and deliver the response message, or an error, including a timeout if no message was received properly.

func (*Conn) RequestMsgWithContext added in v1.11.0

func (nc *Conn) RequestMsgWithContext(ctx context.Context, msg *Msg) (*Msg, error)

RequestMsgWithContext takes a context, a subject and payload in bytes and request expecting a single response.

func (*Conn) RequestWithContext added in v1.3.0

func (nc *Conn) RequestWithContext(ctx context.Context, subj string, data []byte) (*Msg, error)

RequestWithContext takes a context, a subject and payload in bytes and request expecting a single response.

func (*Conn) Servers added in v1.2.2

func (nc *Conn) Servers() []string

Servers returns the list of known server urls, including additional servers discovered after a connection has been established. If authentication is enabled, use UserInfo or Token when connecting with these urls.

func (*Conn) SetClosedHandler added in v1.2.0

func (nc *Conn) SetClosedHandler(cb ConnHandler)

SetClosedHandler will set the reconnect event handler.

func (*Conn) SetDisconnectErrHandler added in v1.8.0

func (nc *Conn) SetDisconnectErrHandler(dcb ConnErrHandler)

SetDisconnectErrHandler will set the disconnect event handler.

func (*Conn) SetDisconnectHandler added in v1.2.0

func (nc *Conn) SetDisconnectHandler(dcb ConnHandler)

SetDisconnectHandler will set the disconnect event handler. DEPRECATED: Use SetDisconnectErrHandler

func (*Conn) SetDiscoveredServersHandler added in v1.3.0

func (nc *Conn) SetDiscoveredServersHandler(dscb ConnHandler)

SetDiscoveredServersHandler will set the discovered servers handler.

func (*Conn) SetErrorHandler added in v1.2.0

func (nc *Conn) SetErrorHandler(cb ErrHandler)

SetErrorHandler will set the async error handler.

func (*Conn) SetReconnectHandler added in v1.2.0

func (nc *Conn) SetReconnectHandler(rcb ConnHandler)

SetReconnectHandler will set the reconnect event handler.

func (*Conn) Stats

func (nc *Conn) Stats() Statistics

Stats will return a race safe copy of the Statistics section for the connection.

func (*Conn) Status

func (nc *Conn) Status() Status

Status returns the current state of the connection.

func (*Conn) Subscribe

func (nc *Conn) Subscribe(subj string, cb MsgHandler) (*Subscription, error)

Subscribe will express interest in the given subject. The subject can have wildcards (partial:*, full:>). Messages will be delivered to the associated MsgHandler.

Example

This Example shows an asynchronous subscriber.

nc, _ := nats.Connect(nats.DefaultURL)
defer nc.Close()

nc.Subscribe("foo", func(m *nats.Msg) {
	fmt.Printf("Received a message: %s\n", string(m.Data))
})
Output:

func (*Conn) SubscribeSync

func (nc *Conn) SubscribeSync(subj string) (*Subscription, error)

SubscribeSync will express interest on the given subject. Messages will be received synchronously using Subscription.NextMsg().

Example

This Example shows a synchronous subscriber.

nc, _ := nats.Connect(nats.DefaultURL)
defer nc.Close()

sub, _ := nc.SubscribeSync("foo")
m, err := sub.NextMsg(1 * time.Second)
if err == nil {
	fmt.Printf("Received a message: %s\n", string(m.Data))
} else {
	fmt.Println("NextMsg timed out.")
}
Output:

func (*Conn) TLSRequired added in v1.2.0

func (nc *Conn) TLSRequired() bool

TLSRequired will return if the connected server requires TLS connections.

type ConnErrHandler added in v1.8.0

type ConnErrHandler func(*Conn, error)

ConnErrHandler is used to process asynchronous events like disconnected connection with the error (if any).

type ConnHandler

type ConnHandler func(*Conn)

ConnHandler is used for asynchronous events such as disconnected and closed connections.

type ConsumerConfig added in v1.11.0

type ConsumerConfig struct {
	Durable         string        `json:"durable_name,omitempty"`
	Description     string        `json:"description,omitempty"`
	DeliverSubject  string        `json:"deliver_subject,omitempty"`
	DeliverGroup    string        `json:"deliver_group,omitempty"`
	DeliverPolicy   DeliverPolicy `json:"deliver_policy"`
	OptStartSeq     uint64        `json:"opt_start_seq,omitempty"`
	OptStartTime    *time.Time    `json:"opt_start_time,omitempty"`
	AckPolicy       AckPolicy     `json:"ack_policy"`
	AckWait         time.Duration `json:"ack_wait,omitempty"`
	MaxDeliver      int           `json:"max_deliver,omitempty"`
	FilterSubject   string        `json:"filter_subject,omitempty"`
	ReplayPolicy    ReplayPolicy  `json:"replay_policy"`
	RateLimit       uint64        `json:"rate_limit_bps,omitempty"` // Bits per sec
	SampleFrequency string        `json:"sample_freq,omitempty"`
	MaxWaiting      int           `json:"max_waiting,omitempty"`
	MaxAckPending   int           `json:"max_ack_pending,omitempty"`
	FlowControl     bool          `json:"flow_control,omitempty"`
	Heartbeat       time.Duration `json:"idle_heartbeat,omitempty"`
	HeadersOnly     bool          `json:"headers_only,omitempty"`
}

ConsumerConfig is the configuration of a JetStream consumer.

type ConsumerInfo added in v1.11.0

type ConsumerInfo struct {
	Stream         string         `json:"stream_name"`
	Name           string         `json:"name"`
	Created        time.Time      `json:"created"`
	Config         ConsumerConfig `json:"config"`
	Delivered      SequenceInfo   `json:"delivered"`
	AckFloor       SequenceInfo   `json:"ack_floor"`
	NumAckPending  int            `json:"num_ack_pending"`
	NumRedelivered int            `json:"num_redelivered"`
	NumWaiting     int            `json:"num_waiting"`
	NumPending     uint64         `json:"num_pending"`
	Cluster        *ClusterInfo   `json:"cluster,omitempty"`
	PushBound      bool           `json:"push_bound,omitempty"`
}

ConsumerInfo is the info from a JetStream consumer.

type ContextOpt added in v1.11.0

type ContextOpt struct {
	context.Context
}

ContextOpt is an option used to set a context.Context.

func Context added in v1.11.0

func Context(ctx context.Context) ContextOpt

Context returns an option that can be used to configure a context for APIs that are context aware such as those part of the JetStream interface.

Example
nc, err := nats.Connect("localhost")
if err != nil {
	log.Fatal(err)
}

js, _ := nc.JetStream()

// Base context
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// nats.Context option implements context.Context interface, so can be used
// to create a new context from top level one.
nctx := nats.Context(ctx)

// JetStreamManager functions all can use context.
js.AddStream(&nats.StreamConfig{
	Name:     "FOO",
	Subjects: []string{"foo"},
}, nctx)

// Custom context with timeout
tctx, tcancel := context.WithTimeout(nctx, 2*time.Second)
defer tcancel()

// Set a timeout for publishing using context.
deadlineCtx := nats.Context(tctx)

js.Publish("foo", []byte("Hello JS!"), deadlineCtx)
sub, _ := js.SubscribeSync("foo")
msg, _ := sub.NextMsgWithContext(deadlineCtx)

// Acks can also use a context to await for a response.
msg.Ack(deadlineCtx)
Output:

type CustomDialer added in v1.4.0

type CustomDialer interface {
	Dial(network, address string) (net.Conn, error)
}

CustomDialer can be used to specify any dialer, not necessarily a *net.Dialer.

type DeliverPolicy added in v1.11.0

type DeliverPolicy int

DeliverPolicy determines how the consumer should select the first message to deliver.

const (
	// DeliverAllPolicy starts delivering messages from the very beginning of a
	// stream. This is the default.
	DeliverAllPolicy DeliverPolicy = iota

	// DeliverLastPolicy will start the consumer with the last sequence
	// received.
	DeliverLastPolicy

	// DeliverNewPolicy will only deliver new messages that are sent after the
	// consumer is created.
	DeliverNewPolicy

	// DeliverByStartSequencePolicy will deliver messages starting from a given
	// sequence.
	DeliverByStartSequencePolicy

	// DeliverByStartTimePolicy will deliver messages starting from a given
	// time.
	DeliverByStartTimePolicy

	// DeliverLastPerSubjectPolicy will start the consumer with the last message
	// for all subjects received.
	DeliverLastPerSubjectPolicy
)

func (DeliverPolicy) MarshalJSON added in v1.11.0

func (p DeliverPolicy) MarshalJSON() ([]byte, error)

func (*DeliverPolicy) UnmarshalJSON added in v1.11.0

func (p *DeliverPolicy) UnmarshalJSON(data []byte) error

type DiscardPolicy added in v1.11.0

type DiscardPolicy int

DiscardPolicy determines how to proceed when limits of messages or bytes are reached.

const (
	// DiscardOld will remove older messages to return to the limits. This is
	// the default.
	DiscardOld DiscardPolicy = iota
	//DiscardNew will fail to store new messages.
	DiscardNew
)

func (DiscardPolicy) MarshalJSON added in v1.11.0

func (dp DiscardPolicy) MarshalJSON() ([]byte, error)

func (DiscardPolicy) String added in v1.11.0

func (dp DiscardPolicy) String() string

func (*DiscardPolicy) UnmarshalJSON added in v1.11.0

func (dp *DiscardPolicy) UnmarshalJSON(data []byte) error

type EncodedConn

type EncodedConn struct {
	Conn *Conn
	Enc  Encoder
}

EncodedConn are the preferred way to interface with NATS. They wrap a bare connection to a nats server and have an extendable encoder system that will encode and decode messages from raw Go types.

func NewEncodedConn

func NewEncodedConn(c *Conn, encType string) (*EncodedConn, error)

NewEncodedConn will wrap an existing Connection and utilize the appropriate registered encoder.

Example

Shows how to wrap a Conn into an EncodedConn

nc, _ := nats.Connect(nats.DefaultURL)
c, _ := nats.NewEncodedConn(nc, "json")
c.Close()
Output:

func (*EncodedConn) BindRecvChan

func (c *EncodedConn) BindRecvChan(subject string, channel interface{}) (*Subscription, error)

BindRecvChan binds a channel for receive operations from NATS.

Example

BindRecvChan() allows binding of a Go channel to a nats subject for subscribe operations. The Encoder attached to the EncodedConn will be used for un-marshaling.

nc, _ := nats.Connect(nats.DefaultURL)
c, _ := nats.NewEncodedConn(nc, "json")
defer c.Close()

type person struct {
	Name    string
	Address string
	Age     int
}

ch := make(chan *person)
c.BindRecvChan("hello", ch)

me := &person{Name: "derek", Age: 22, Address: "85 Second St"}
c.Publish("hello", me)

// Receive the publish directly on a channel
who := <-ch

fmt.Printf("%v says hello!\n", who)
Output:

func (*EncodedConn) BindRecvQueueChan

func (c *EncodedConn) BindRecvQueueChan(subject, queue string, channel interface{}) (*Subscription, error)

BindRecvQueueChan binds a channel for queue-based receive operations from NATS.

func (*EncodedConn) BindSendChan

func (c *EncodedConn) BindSendChan(subject string, channel interface{}) error

BindSendChan binds a channel for send operations to NATS.

Example

BindSendChan() allows binding of a Go channel to a nats subject for publish operations. The Encoder attached to the EncodedConn will be used for marshaling.

nc, _ := nats.Connect(nats.DefaultURL)
c, _ := nats.NewEncodedConn(nc, "json")
defer c.Close()

type person struct {
	Name    string
	Address string
	Age     int
}

ch := make(chan *person)
c.BindSendChan("hello", ch)

me := &person{Name: "derek", Age: 22, Address: "85 Second St"}
ch <- me
Output:

func (*EncodedConn) Close

func (c *EncodedConn) Close()

Close will close the connection to the server. This call will release all blocking calls, such as Flush(), etc.

func (*EncodedConn) Drain added in v1.6.0

func (c *EncodedConn) Drain() error

Drain will put a connection into a drain state. All subscriptions will immediately be put into a drain state. Upon completion, the publishers will be drained and can not publish any additional messages. Upon draining of the publishers, the connection will be closed. Use the ClosedCB() option to know when the connection has moved from draining to closed.

func (*EncodedConn) Flush

func (c *EncodedConn) Flush() error

Flush will perform a round trip to the server and return when it receives the internal reply.

func (*EncodedConn) FlushTimeout

func (c *EncodedConn) FlushTimeout(timeout time.Duration) (err error)

FlushTimeout allows a Flush operation to have an associated timeout.

func (*EncodedConn) LastError

func (c *EncodedConn) LastError() error

LastError reports the last error encountered via the Connection.

func (*EncodedConn) Publish

func (c *EncodedConn) Publish(subject string, v interface{}) error

Publish publishes the data argument to the given subject. The data argument will be encoded using the associated encoder.

Example

EncodedConn can publish virtually anything just by passing it in. The encoder will be used to properly encode the raw Go type

nc, _ := nats.Connect(nats.DefaultURL)
c, _ := nats.NewEncodedConn(nc, "json")
defer c.Close()

type person struct {
	Name    string
	Address string
	Age     int
}

me := &person{Name: "derek", Age: 22, Address: "85 Second St"}
c.Publish("hello", me)
Output:

func (*EncodedConn) PublishRequest

func (c *EncodedConn) PublishRequest(subject, reply string, v interface{}) error

PublishRequest will perform a Publish() expecting a response on the reply subject. Use Request() for automatically waiting for a response inline.

func (*EncodedConn) QueueSubscribe

func (c *EncodedConn) QueueSubscribe(subject, queue string, cb Handler) (*Subscription, error)

QueueSubscribe will create a queue subscription on the given subject and process incoming messages using the specified Handler. The Handler should be a func that matches a signature from the description of Handler from above.

func (*EncodedConn) Request

func (c *EncodedConn) Request(subject string, v interface{}, vPtr interface{}, timeout time.Duration) error

Request will create an Inbox and perform a Request() call with the Inbox reply for the data v. A response will be decoded into the vPtr Response.

func (*EncodedConn) RequestWithContext added in v1.3.0

func (c *EncodedConn) RequestWithContext(ctx context.Context, subject string, v interface{}, vPtr interface{}) error

RequestWithContext will create an Inbox and perform a Request using the provided cancellation context with the Inbox reply for the data v. A response will be decoded into the vPtrResponse.

func (*EncodedConn) Subscribe

func (c *EncodedConn) Subscribe(subject string, cb Handler) (*Subscription, error)

Subscribe will create a subscription on the given subject and process incoming messages using the specified Handler. The Handler should be a func that matches a signature from the description of Handler from above.

Example

EncodedConn's subscribers will automatically decode the wire data into the requested Go type using the Decode() method of the registered Encoder. The callback signature can also vary to include additional data, such as subject and reply subjects.

nc, _ := nats.Connect(nats.DefaultURL)
c, _ := nats.NewEncodedConn(nc, "json")
defer c.Close()

type person struct {
	Name    string
	Address string
	Age     int
}

c.Subscribe("hello", func(p *person) {
	fmt.Printf("Received a person! %+v\n", p)
})

c.Subscribe("hello", func(subj, reply string, p *person) {
	fmt.Printf("Received a person on subject %s! %+v\n", subj, p)
})

me := &person{Name: "derek", Age: 22, Address: "85 Second St"}
c.Publish("hello", me)
Output:

type Encoder

type Encoder interface {
	Encode(subject string, v interface{}) ([]byte, error)
	Decode(subject string, data []byte, vPtr interface{}) error
}

Encoder interface is for all register encoders

func EncoderForType

func EncoderForType(encType string) Encoder

EncoderForType will return the registered Encoder for the encType.

type ErrConsumerSequenceMismatch added in v1.11.0

type ErrConsumerSequenceMismatch struct {
	// StreamResumeSequence is the stream sequence from where the consumer
	// should resume consuming from the stream.
	StreamResumeSequence uint64

	// ConsumerSequence is the sequence of the consumer that is behind.
	ConsumerSequence uint64

	// LastConsumerSequence is the sequence of the consumer when the heartbeat
	// was received.
	LastConsumerSequence uint64
}

ErrConsumerSequenceMismatch represents an error from a consumer that received a Heartbeat including sequence different to the one expected from the view of the client.

func (*ErrConsumerSequenceMismatch) Error added in v1.11.0

func (ecs *ErrConsumerSequenceMismatch) Error() string

type ErrHandler

type ErrHandler func(*Conn, *Subscription, error)

ErrHandler is used to process asynchronous errors encountered while processing inbound messages.

type ExternalStream added in v1.11.0

type ExternalStream struct {
	APIPrefix     string `json:"api"`
	DeliverPrefix string `json:"deliver"`
}

ExternalStream allows you to qualify access to a stream source in another account.

type Handler

type Handler interface{}

Handler is a specific callback used for Subscribe. It is generalized to an interface{}, but we will discover its format and arguments at runtime and perform the correct callback, including de-marshaling encoded data back into the appropriate struct based on the signature of the Handler.

Handlers are expected to have one of four signatures.

type person struct {
	Name string `json:"name,omitempty"`
	Age  uint   `json:"age,omitempty"`
}

handler := func(m *Msg)
handler := func(p *person)
handler := func(subject string, o *obj)
handler := func(subject, reply string, o *obj)

These forms allow a callback to request a raw Msg ptr, where the processing of the message from the wire is untouched. Process a JSON representation and demarshal it into the given struct, e.g. person. There are also variants where the callback wants either the subject, or the subject and the reply subject.

type Header map[string][]string

Header represents the optional Header for a NATS message, based on the implementation of http.Header.

func (Header) Add added in v1.11.0

func (h Header) Add(key, value string)

Add adds the key, value pair to the header. It is case-sensitive and appends to any existing values associated with key.

func (Header) Del added in v1.11.0

func (h Header) Del(key string)

Del deletes the values associated with a key. It is case-sensitive.

func (Header) Get added in v1.11.0

func (h Header) Get(key string) string

Get gets the first value associated with the given key. It is case-sensitive.

func (Header) Set added in v1.11.0

func (h Header) Set(key, value string)

Set sets the header entries associated with key to the single element value. It is case-sensitive and replaces any existing values associated with key.

func (Header) Values added in v1.11.0

func (h Header) Values(key string) []string

Values returns all values associated with the given key. It is case-sensitive.

type JSOpt added in v1.11.0

type JSOpt interface {
	// contains filtered or unexported methods
}

JSOpt configures a JetStreamContext.

Example

A JetStream context can be configured with a default timeout using nats.MaxWait or with a custom API prefix in case of using an imported JetStream from another account.

nc, err := nats.Connect("localhost")
if err != nil {
	log.Fatal(err)
}

// Use the JetStream context to manage streams and consumers (with nats.APIPrefix JSOpt)
js, err := nc.JetStream(nats.APIPrefix("dlc"), nats.MaxWait(5*time.Second))
if err != nil {
	log.Fatal(err)
}
sub, _ := js.SubscribeSync("foo")
js.Publish("foo", []byte("Hello JS!"))
sub.NextMsg(2 * time.Second)
Output:

func APIPrefix added in v1.11.0

func APIPrefix(pre string) JSOpt

APIPrefix changes the default prefix used for the JetStream API.

func Domain added in v1.12.0

func Domain(domain string) JSOpt

Domain changes the domain part of JetSteam API prefix.

func PublishAsyncErrHandler added in v1.11.0

func PublishAsyncErrHandler(cb MsgErrHandler) JSOpt

PublishAsyncErrHandler sets the error handler for async publishes in JetStream.

func PublishAsyncMaxPending added in v1.11.0

func PublishAsyncMaxPending(max int) JSOpt

PublishAsyncMaxPending sets the maximum outstanding async publishes that can be inflight at one time.

type JetStream added in v1.11.0

type JetStream interface {
	// Publish publishes a message to JetStream.
	Publish(subj string, data []byte, opts ...PubOpt) (*PubAck, error)

	// PublishMsg publishes a Msg to JetStream.
	PublishMsg(m *Msg, opts ...PubOpt) (*PubAck, error)

	// PublishAsync publishes a message to JetStream and returns a PubAckFuture.
	// The data should not be changed until the PubAckFuture has been processed.
	PublishAsync(subj string, data []byte, opts ...PubOpt) (PubAckFuture, error)

	// PublishMsgAsync publishes a Msg to JetStream and returms a PubAckFuture.
	// The message should not be changed until the PubAckFuture has been processed.
	PublishMsgAsync(m *Msg, opts ...PubOpt) (PubAckFuture, error)

	// PublishAsyncPending returns the number of async publishes outstanding for this context.
	PublishAsyncPending() int

	// PublishAsyncComplete returns a channel that will be closed when all outstanding messages are ack'd.
	PublishAsyncComplete() <-chan struct{}

	// Subscribe creates an async Subscription for JetStream.
	// The stream and consumer names can be provided with the nats.Bind() option.
	// For creating an ephemeral (where the consumer name is picked by the server),
	// you can provide the stream name with nats.BindStream().
	// If no stream name is specified, the library will attempt to figure out which
	// stream the subscription is for. See important notes below for more details.
	//
	// IMPORTANT NOTES:
	// * If none of the options Bind() nor Durable() are specified, the library will
	// send a request to the server to create an ephemeral JetStream consumer,
	// which will be deleted after an Unsubscribe() or Drain(), or automatically
	// by the server after a short period of time after the NATS subscription is
	// gone.
	// * If Durable() option is specified, the library will attempt to lookup a JetStream
	// consumer with this name, and if found, will bind to it and not attempt to
	// delete it. However, if not found, the library will send a request to create
	// such durable JetStream consumer. The library will delete the JetStream consumer
	// after an Unsubscribe() or Drain().
	// * If Bind() option is provided, the library will attempt to lookup the
	// consumer with the given name, and if successful, bind to it. If the lookup fails,
	// then the Subscribe() call will return an error.
	Subscribe(subj string, cb MsgHandler, opts ...SubOpt) (*Subscription, error)

	// SubscribeSync creates a Subscription that can be used to process messages synchronously.
	// See important note in Subscribe()
	SubscribeSync(subj string, opts ...SubOpt) (*Subscription, error)

	// ChanSubscribe creates channel based Subscription.
	// See important note in Subscribe()
	ChanSubscribe(subj string, ch chan *Msg, opts ...SubOpt) (*Subscription, error)

	// ChanQueueSubscribe creates channel based Subscription with a queue group.
	// See important note in QueueSubscribe()
	ChanQueueSubscribe(subj, queue string, ch chan *Msg, opts ...SubOpt) (*Subscription, error)

	// QueueSubscribe creates a Subscription with a queue group.
	// If no optional durable name nor binding options are specified, the queue name will be used as a durable name.
	// See important note in Subscribe()
	QueueSubscribe(subj, queue string, cb MsgHandler, opts ...SubOpt) (*Subscription, error)

	// QueueSubscribeSync creates a Subscription with a queue group that can be used to process messages synchronously.
	// See important note in QueueSubscribe()
	QueueSubscribeSync(subj, queue string, opts ...SubOpt) (*Subscription, error)

	// PullSubscribe creates a Subscription that can fetch messages.
	// See important note in Subscribe()
	PullSubscribe(subj, durable string, opts ...SubOpt) (*Subscription, error)
}

JetStream allows persistent messaging through JetStream.

Example
nc, err := nats.Connect("localhost")
if err != nil {
	log.Fatal(err)
}

// Use the JetStream context to produce and consumer messages
// that have been persisted.
js, err := nc.JetStream(nats.PublishAsyncMaxPending(256))
if err != nil {
	log.Fatal(err)
}

js.AddStream(&nats.StreamConfig{
	Name:     "FOO",
	Subjects: []string{"foo"},
})

js.Publish("foo", []byte("Hello JS!"))

// Publish messages asynchronously.
for i := 0; i < 500; i++ {
	js.PublishAsync("foo", []byte("Hello JS Async!"))
}
select {
case <-js.PublishAsyncComplete():
case <-time.After(5 * time.Second):
	fmt.Println("Did not resolve in time")
}

// Create async consumer on subject 'foo'. Async subscribers
// ack a message once exiting the callback.
js.Subscribe("foo", func(msg *nats.Msg) {
	meta, _ := msg.Metadata()
	fmt.Printf("Stream Sequence  : %v\n", meta.Sequence.Stream)
	fmt.Printf("Consumer Sequence: %v\n", meta.Sequence.Consumer)
})

// Async subscriber with manual acks.
js.Subscribe("foo", func(msg *nats.Msg) {
	msg.Ack()
}, nats.ManualAck())

// Async queue subscription where members load balance the
// received messages together.
// If no consumer name is specified, either with nats.Bind()
// or nats.Durable() options, the queue name is used as the
// durable name (that is, as if you were passing the
// nats.Durable(<queue group name>) option.
// It is recommended to use nats.Bind() or nats.Durable()
// and preferably create the JetStream consumer beforehand
// (using js.AddConsumer) so that the JS consumer is not
// deleted on an Unsubscribe() or Drain() when the member
// that created the consumer goes away first.
// Check Godoc for the QueueSubscribe() API for more details.
js.QueueSubscribe("foo", "group", func(msg *nats.Msg) {
	msg.Ack()
}, nats.ManualAck())

// Subscriber to consume messages synchronously.
sub, _ := js.SubscribeSync("foo")
msg, _ := sub.NextMsg(2 * time.Second)
msg.Ack()

// We can add a member to the group, with this member using
// the synchronous version of the QueueSubscribe.
sub, _ = js.QueueSubscribeSync("foo", "group")
msg, _ = sub.NextMsg(2 * time.Second)
msg.Ack()

// ChanSubscribe
msgCh := make(chan *nats.Msg, 8192)
sub, _ = js.ChanSubscribe("foo", msgCh)

select {
case msg := <-msgCh:
	fmt.Println("[Received]", msg)
case <-time.After(1 * time.Second):
}

// Create Pull based consumer with maximum 128 inflight.
sub, _ = js.PullSubscribe("foo", "wq", nats.PullMaxWaiting(128))

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

for {
	select {
	case <-ctx.Done():
		return
	default:
	}

	msgs, _ := sub.Fetch(10, nats.Context(ctx))
	for _, msg := range msgs {
		msg.Ack()
	}
}
Output:

type JetStreamContext added in v1.11.0

type JetStreamContext interface {
	JetStream
	JetStreamManager
	KeyValueManager
	ObjectStoreManager
}

JetStreamContext allows JetStream messaging and stream management.

Example

A JetStreamContext is the composition of a JetStream and JetStreamManagement interfaces. In case of only requiring publishing/consuming messages, can create a context that only uses the JetStream interface.

nc, _ := nats.Connect("localhost")

var js nats.JetStream
var jsm nats.JetStreamManager
var jsctx nats.JetStreamContext

// JetStream that can publish/subscribe but cannot manage streams.
js, _ = nc.JetStream()
js.Publish("foo", []byte("hello"))

// JetStream context that can manage streams/consumers but cannot produce messages.
jsm, _ = nc.JetStream()
jsm.AddStream(&nats.StreamConfig{Name: "FOO"})

// JetStream context that can both manage streams/consumers
// as well as publish/subscribe.
jsctx, _ = nc.JetStream()
jsctx.AddStream(&nats.StreamConfig{Name: "BAR"})
jsctx.Publish("bar", []byte("hello world"))
Output:

type JetStreamManager added in v1.11.0

type JetStreamManager interface {
	// AddStream creates a stream.
	AddStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error)

	// UpdateStream updates a stream.
	UpdateStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error)

	// DeleteStream deletes a stream.
	DeleteStream(name string, opts ...JSOpt) error

	// StreamInfo retrieves information from a stream.
	StreamInfo(stream string, opts ...JSOpt) (*StreamInfo, error)

	// PurgeStream purges a stream messages.
	PurgeStream(name string, opts ...JSOpt) error

	// StreamsInfo can be used to retrieve a list of StreamInfo objects.
	StreamsInfo(opts ...JSOpt) <-chan *StreamInfo

	// StreamNames is used to retrieve a list of Stream names.
	StreamNames(opts ...JSOpt) <-chan string

	// GetMsg retrieves a raw stream message stored in JetStream by sequence number.
	GetMsg(name string, seq uint64, opts ...JSOpt) (*RawStreamMsg, error)

	// DeleteMsg erases a message from a stream.
	DeleteMsg(name string, seq uint64, opts ...JSOpt) error

	// AddConsumer adds a consumer to a stream.
	AddConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error)

	// DeleteConsumer deletes a consumer.
	DeleteConsumer(stream, consumer string, opts ...JSOpt) error

	// ConsumerInfo retrieves information of a consumer from a stream.
	ConsumerInfo(stream, name string, opts ...JSOpt) (*ConsumerInfo, error)

	// ConsumersInfo is used to retrieve a list of ConsumerInfo objects.
	ConsumersInfo(stream string, opts ...JSOpt) <-chan *ConsumerInfo

	// ConsumerNames is used to retrieve a list of Consumer names.
	ConsumerNames(stream string, opts ...JSOpt) <-chan string

	// AccountInfo retrieves info about the JetStream usage from an account.
	AccountInfo(opts ...JSOpt) (*AccountInfo, error)
}

JetStreamManager manages JetStream Streams and Consumers.

Example
nc, _ := nats.Connect("localhost")

js, _ := nc.JetStream()

// Create a stream
js.AddStream(&nats.StreamConfig{
	Name:     "FOO",
	Subjects: []string{"foo"},
	MaxBytes: 1024,
})

// Update a stream
js.UpdateStream(&nats.StreamConfig{
	Name:     "FOO",
	MaxBytes: 2048,
})

// Create a druable consumer
js.AddConsumer("FOO", &nats.ConsumerConfig{
	Durable: "BAR",
})

// Get information about all streams (with Context JSOpt)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
for info := range js.StreamsInfo(nats.Context(ctx)) {
	fmt.Println("stream name:", info.Config.Name)
}

// Get information about all consumers (with MaxWait JSOpt)
for info := range js.ConsumersInfo("FOO", nats.MaxWait(10*time.Second)) {
	fmt.Println("consumer name:", info.Name)
}

// Delete a consumer
js.DeleteConsumer("FOO", "BAR")

// Delete a stream
js.DeleteStream("FOO")
Output:

type KeyValue added in v1.13.0

type KeyValue interface {
	// Get returns the latest value for the key.
	Get(key string) (entry KeyValueEntry, err error)
	// Put will place the new value for the key into the store.
	Put(key string, value []byte) (revision uint64, err error)
	// PutString will place the string for the key into the store.
	PutString(key string, value string) (revision uint64, err error)
	// Create will add the key/value pair iff it does not exist.
	Create(key string, value []byte) (revision uint64, err error)
	// Update will update the value iff the latest revision matches.
	Update(key string, value []byte, last uint64) (revision uint64, err error)
	// Delete will place a delete marker and leave all revisions.
	Delete(key string) error
	// Purge will place a delete marker and remove all previous revisions.
	Purge(key string) error
	// Watch for any updates to keys that match the keys argument which could include wildcards.
	// Watch will send a nil entry when it has received all initial values.
	Watch(keys string, opts ...WatchOpt) (KeyWatcher, error)
	// WatchAll will invoke the callback for all updates.
	WatchAll(opts ...WatchOpt) (KeyWatcher, error)
	// Keys will return all keys.
	Keys(opts ...WatchOpt) ([]string, error)
	// History will return all historical values for the key.
	History(key string, opts ...WatchOpt) ([]KeyValueEntry, error)
	// Bucket returns the current bucket name.
	Bucket() string
	// PurgeDeletes will remove all current delete markers.
	PurgeDeletes(opts ...WatchOpt) error
}

Notice: Experimental Preview

This functionality is EXPERIMENTAL and may be changed in later releases.

type KeyValueConfig added in v1.13.0

type KeyValueConfig struct {
	Bucket       string
	Description  string
	MaxValueSize int32
	History      uint8
	TTL          time.Duration
	MaxBytes     int64
	Storage      StorageType
	Replicas     int
}

KeyValueConfig is for configuring a KeyValue store.

type KeyValueEntry added in v1.13.0

type KeyValueEntry interface {
	// Bucket is the bucket the data was loaded from.
	Bucket() string
	// Key is the key that was retrieved.
	Key() string
	// Value is the retrieved value.
	Value() []byte
	// Revision is a unique sequence for this value.
	Revision() uint64
	// Created is the time the data was put in the bucket.
	Created() time.Time
	// Delta is distance from the latest value.
	Delta() uint64
	// Operation returns Put or Delete or Purge.
	Operation() KeyValueOp
}

KeyValueEntry is a retrieved entry for Get or List or Watch.

type KeyValueManager added in v1.13.0

type KeyValueManager interface {
	// KeyValue will lookup and bind to an existing KeyValue store.
	KeyValue(bucket string) (KeyValue, error)
	// CreateKeyValue will create a KeyValue store with the following configuration.
	CreateKeyValue(cfg *KeyValueConfig) (KeyValue, error)
	// DeleteKeyValue will delete this KeyValue store (JetStream stream).
	DeleteKeyValue(bucket string) error
}

Notice: Experimental Preview

This functionality is EXPERIMENTAL and may be changed in later releases.

type KeyValueOp added in v1.13.0

type KeyValueOp uint8
const (
	KeyValuePut KeyValueOp = iota
	KeyValueDelete
	KeyValuePurge
)

func (KeyValueOp) String added in v1.13.0

func (op KeyValueOp) String() string

type KeyWatcher added in v1.13.0

type KeyWatcher interface {
	// Updates returns a channel to read any updates to entries.
	Updates() <-chan KeyValueEntry
	// Stop() will stop this watcher.
	Stop() error
}

KeyWatcher is what is returned when doing a watch.

type MaxWait added in v1.11.0

type MaxWait time.Duration

MaxWait sets the maximum amount of time we will wait for a response.

Example
nc, _ := nats.Connect("localhost")

// Set default timeout for JetStream API requests,
// following requests will inherit this timeout.
js, _ := nc.JetStream(nats.MaxWait(3 * time.Second))

// Set custom timeout for a JetStream API request.
js.AddStream(&nats.StreamConfig{
	Name:     "FOO",
	Subjects: []string{"foo"},
}, nats.MaxWait(2*time.Second))

sub, _ := js.PullSubscribe("foo", "my-durable-name")

// Fetch using the default timeout of 3 seconds.
msgs, _ := sub.Fetch(1)

// Set custom timeout for a pull batch request.
msgs, _ = sub.Fetch(1, nats.MaxWait(2*time.Second))

for _, msg := range msgs {
	msg.Ack()
}
Output:

type Msg

type Msg struct {
	Subject string
	Reply   string
	Header  Header
	Data    []byte
	Sub     *Subscription
	// contains filtered or unexported fields
}

Msg represents a message delivered by NATS. This structure is used by Subscribers and PublishMsg().

Types of Acknowledgements

In case using JetStream, there are multiple ways to ack a Msg:

// Acknowledgement that a message has been processed.
msg.Ack()

// Negatively acknowledges a message.
msg.Nak()

// Terminate a message so that it is not redelivered further.
msg.Term()

// Signal the server that the message is being worked on and reset redelivery timer.
msg.InProgress()

func NewMsg added in v1.11.0

func NewMsg(subject string) *Msg

NewMsg creates a message for publishing that will use headers.

func (*Msg) Ack added in v1.11.0

func (m *Msg) Ack(opts ...AckOpt) error

Ack acknowledges a message. This tells the server that the message was successfully processed and it can move on to the next message.

func (*Msg) AckSync added in v1.11.0

func (m *Msg) AckSync(opts ...AckOpt) error

AckSync is the synchronous version of Ack. This indicates successful message processing.

Example
nc, _ := nats.Connect("localhost")
js, _ := nc.JetStream()

// Set custom timeout for a JetStream API request.
js.AddStream(&nats.StreamConfig{
	Name:     "FOO",
	Subjects: []string{"foo"},
})

sub, _ := js.SubscribeSync("foo")
msg, _ := sub.NextMsg(2 * time.Second)

// Wait for ack of an ack.
msg.AckSync()
Output:

func (*Msg) InProgress added in v1.11.0

func (m *Msg) InProgress(opts ...AckOpt) error

InProgress tells the server that this message is being worked on. It resets the redelivery timer on the server.

func (*Msg) Metadata added in v1.11.0

func (m *Msg) Metadata() (*MsgMetadata, error)

Metadata retrieves the metadata from a JetStream message. This method will return an error for non-JetStream Msgs.

Example

When a message has been delivered by JetStream, it will be possible to access some of its metadata such as sequence numbers.

nc, _ := nats.Connect("localhost")
js, _ := nc.JetStream()

// Set custom timeout for a JetStream API request.
js.AddStream(&nats.StreamConfig{
	Name:     "FOO",
	Subjects: []string{"foo"},
})

js.Publish("foo", []byte("hello"))

sub, _ := js.SubscribeSync("foo")
msg, _ := sub.NextMsg(2 * time.Second)

//
meta, _ := msg.Metadata()

// Stream and Consumer sequences.
fmt.Printf("Stream seq: %s:%d, Consumer seq: %s:%d\n", meta.Stream, meta.Sequence.Stream, meta.Consumer, meta.Sequence.Consumer)
fmt.Printf("Pending: %d\n", meta.NumPending)
fmt.Printf("Pending: %d\n", meta.NumDelivered)
Output:

func (*Msg) Nak added in v1.11.0

func (m *Msg) Nak(opts ...AckOpt) error

Nak negatively acknowledges a message. This tells the server to redeliver the message. You can configure the number of redeliveries by passing nats.MaxDeliver when you Subscribe. The default is infinite redeliveries.

func (*Msg) Respond added in v1.8.0

func (m *Msg) Respond(data []byte) error

Respond allows a convenient way to respond to requests in service based subscriptions.

func (*Msg) RespondMsg added in v1.11.0

func (m *Msg) RespondMsg(msg *Msg) error

RespondMsg allows a convenient way to respond to requests in service based subscriptions that might include headers

func (*Msg) Term added in v1.11.0

func (m *Msg) Term(opts ...AckOpt) error

Term tells the server to not redeliver this message, regardless of the value of nats.MaxDeliver.

type MsgErrHandler added in v1.11.0

type MsgErrHandler func(JetStream, *Msg, error)

MsgErrHandler is used to process asynchronous errors from JetStream PublishAsync and PublishAsynMsg. It will return the original message sent to the server for possible retransmitting and the error encountered.

type MsgHandler

type MsgHandler func(msg *Msg)

MsgHandler is a callback function that processes messages delivered to asynchronous subscribers.

type MsgMetadata added in v1.11.0

type MsgMetadata struct {
	Sequence     SequencePair
	NumDelivered uint64
	NumPending   uint64
	Timestamp    time.Time
	Stream       string
	Consumer     string
	Domain       string
}

MsgMetadata is the JetStream metadata associated with received messages.

type ObjectInfo added in v1.13.0

type ObjectInfo struct {
	ObjectMeta
	Bucket  string    `json:"bucket"`
	NUID    string    `json:"nuid"`
	Size    uint64    `json:"size"`
	ModTime time.Time `json:"mtime"`
	Chunks  uint32    `json:"chunks"`
	Digest  string    `json:"digest,omitempty"`
	Deleted bool      `json:"deleted,omitempty"`
}

ObjectInfo is meta plus instance information.

type ObjectLink struct {
	// Bucket is the name of the other object store.
	Bucket string `json:"bucket"`
	// Name can be used to link to a single object.
	// If empty means this is a link to the whole store, like a directory.
	Name string `json:"name,omitempty"`
}

ObjectLink is used to embed links to other buckets and objects.

type ObjectMeta added in v1.13.0

type ObjectMeta struct {
	Name        string `json:"name"`
	Description string `json:"description,omitempty"`
	Headers     Header `json:"headers,omitempty"`

	// Optional options.
	Opts *ObjectMetaOptions `json:"options,omitempty"`
}

ObjectMeta is high level information about an object.

type ObjectMetaOptions added in v1.13.0

type ObjectMetaOptions struct {
	Link      *ObjectLink `json:"link,omitempty"`
	ChunkSize uint32      `json:"max_chunk_size,omitempty"`
}

ObjectMetaOptions

type ObjectOpt added in v1.13.0

type ObjectOpt interface {
	// contains filtered or unexported methods
}

type ObjectResult added in v1.13.0

type ObjectResult interface {
	io.ReadCloser
	Info() (*ObjectInfo, error)
	Error() error
}

ObjectResult will return the underlying stream info and also be an io.ReadCloser.

type ObjectStore added in v1.13.0

type ObjectStore interface {
	// Put will place the contents from the reader into a new object.
	Put(obj *ObjectMeta, reader io.Reader, opts ...ObjectOpt) (*ObjectInfo, error)
	// Get will pull the named object from the object store.
	Get(name string, opts ...ObjectOpt) (ObjectResult, error)

	// PutBytes is convenience function to put a byte slice into this object store.
	PutBytes(name string, data []byte, opts ...ObjectOpt) (*ObjectInfo, error)
	// GetBytes is a convenience function to pull an object from this object store and return it as a byte slice.
	GetBytes(name string, opts ...ObjectOpt) ([]byte, error)

	// PutBytes is convenience function to put a string into this object store.
	PutString(name string, data string, opts ...ObjectOpt) (*ObjectInfo, error)
	// GetString is a convenience function to pull an object from this object store and return it as a string.
	GetString(name string, opts ...ObjectOpt) (string, error)

	// PutFile is convenience function to put a file into this object store.
	PutFile(file string, opts ...ObjectOpt) (*ObjectInfo, error)
	// GetFile is a convenience function to pull an object from this object store and place it in a file.
	GetFile(name, file string, opts ...ObjectOpt) error

	// GetInfo will retrieve the current information for the object.
	GetInfo(name string) (*ObjectInfo, error)
	// UpdateMeta will update the meta data for the object.
	UpdateMeta(name string, meta *ObjectMeta) error

	// Delete will delete the named object.
	Delete(name string) error

	// AddLink will add a link to another object into this object store.
	AddLink(name string, obj *ObjectInfo) (*ObjectInfo, error)

	// AddBucketLink will add a link to another object store.
	AddBucketLink(name string, bucket ObjectStore) (*ObjectInfo, error)

	// Seal will seal the object store, no further modifications will be allowed.
	Seal() error

	// Watch for changes in the underlying store and receive meta information updates.
	Watch(opts ...WatchOpt) (ObjectWatcher, error)

	// List will list all the objects in this store.
	List(opts ...WatchOpt) ([]*ObjectInfo, error)
}

Notice: Experimental Preview

This functionality is EXPERIMENTAL and may be changed in later releases.

type ObjectStoreConfig added in v1.13.0

type ObjectStoreConfig struct {
	Bucket      string
	Description string
	TTL         time.Duration
	Storage     StorageType
	Replicas    int
}

ObjectStoreConfig is the config for the object store.

type ObjectStoreManager added in v1.13.0

type ObjectStoreManager interface {
	// ObjectStore will lookup and bind to an existing object store instance.
	ObjectStore(bucket string) (ObjectStore, error)
	// CreateObjectStore will create an object store.
	CreateObjectStore(cfg *ObjectStoreConfig) (ObjectStore, error)
	// DeleteObjectStore will delete the underlying stream for the named object.
	DeleteObjectStore(bucket string) error
}

Notice: Experimental Preview

This functionality is EXPERIMENTAL and may be changed in later releases.

type ObjectWatcher added in v1.13.0

type ObjectWatcher interface {
	// Updates returns a channel to read any updates to entries.
	Updates() <-chan *ObjectInfo
	// Stop() will stop this watcher.
	Stop() error
}

ObjectWatcher is what is returned when doing a watch.

type Option added in v1.2.0

type Option func(*Options) error

Option is a function on the options for a connection.

func ClientCert added in v1.2.0

func ClientCert(certFile, keyFile string) Option

ClientCert is a helper option to provide the client certificate from a file. If Secure is not already set this will set it as well.

func ClosedHandler added in v1.2.0

func ClosedHandler(cb ConnHandler) Option

ClosedHandler is an Option to set the closed handler.

func Compression added in v1.11.0

func Compression(enabled bool) Option

Compression is an Option to indicate if this connection supports compression. Currently only supported for Websocket connections.

func CustomInboxPrefix added in v1.12.0

func CustomInboxPrefix(p string) Option

CustomInboxPrefix configures the request + reply inbox prefix

func CustomReconnectDelay added in v1.10.0

func CustomReconnectDelay(cb ReconnectDelayHandler) Option

CustomReconnectDelay is an Option to set the CustomReconnectDelayCB option. See CustomReconnectDelayCB Option for more details.

func Dialer added in v1.2.2

func Dialer(dialer *net.Dialer) Option

Dialer is an Option to set the dialer which will be used when attempting to establish a connection. DEPRECATED: Should use CustomDialer instead.

func DisconnectErrHandler added in v1.8.0

func DisconnectErrHandler(cb ConnErrHandler) Option

DisconnectErrHandler is an Option to set the disconnected error handler.

func DisconnectHandler added in v1.2.0

func DisconnectHandler(cb ConnHandler) Option

DisconnectHandler is an Option to set the disconnected handler. DEPRECATED: Use DisconnectErrHandler.

func DiscoveredServersHandler added in v1.3.0

func DiscoveredServersHandler(cb ConnHandler) Option

DiscoveredServersHandler is an Option to set the new servers handler.

func DontRandomize added in v1.2.0

func DontRandomize() Option

DontRandomize is an Option to turn off randomizing the server pool.

func DrainTimeout added in v1.6.0

func DrainTimeout(t time.Duration) Option

DrainTimeout is an Option to set the timeout for draining a connection.

func ErrorHandler added in v1.2.0

func ErrorHandler(cb ErrHandler) Option

ErrorHandler is an Option to set the async error handler.

func FlusherTimeout added in v1.7.0

func FlusherTimeout(t time.Duration) Option

FlusherTimeout is an Option to set the write (and flush) timeout on a connection.

func LameDuckModeHandler added in v1.11.0

func LameDuckModeHandler(cb ConnHandler) Option

LameDuckModeHandler sets the callback to invoke when the server notifies the connection that it entered lame duck mode, that is, going to gradually disconnect all its connections before shuting down. This is often used in deployments when upgrading NATS Servers.

func MaxPingsOutstanding added in v1.7.2

func MaxPingsOutstanding(max int) Option

MaxPingsOutstanding is an Option to set the maximum number of ping requests that can go un-answered by the server before closing the connection.

func MaxReconnects added in v1.2.0

func MaxReconnects(max int) Option

MaxReconnects is an Option to set the maximum number of reconnect attempts.

func Name added in v1.2.0

func Name(name string) Option

Name is an Option to set the client name.

func Nkey added in v1.7.0

func Nkey(pubKey string, sigCB SignatureHandler) Option

Nkey will set the public Nkey and the signature callback to sign the server nonce.

func NkeyOptionFromSeed added in v1.7.0

func NkeyOptionFromSeed(seedFile string) (Option, error)

NkeyOptionFromSeed will load an nkey pair from a seed file. It will return the NKey Option and will handle signing of nonce challenges from the server. It will take care to not hold keys in memory and to wipe memory.

func NoCallbacksAfterClientClose added in v1.9.0

func NoCallbacksAfterClientClose() Option

NoCallbacksAfterClientClose is an Option to disable callbacks when user code calls Close(). If close is initiated by any other condition, callbacks if any will be invoked.

func NoEcho added in v1.6.0

func NoEcho() Option

NoEcho is an Option to turn off messages echoing back from a server. Note this is supported on servers >= version 1.2. Proto 1 or greater.

func NoReconnect added in v1.2.0

func NoReconnect() Option

NoReconnect is an Option to turn off reconnect behavior.

func PingInterval added in v1.6.0

func PingInterval(t time.Duration) Option

PingInterval is an Option to set the period for client ping commands.

func ReconnectBufSize added in v1.5.0

func ReconnectBufSize(size int) Option

ReconnectBufSize sets the buffer size of messages kept while busy reconnecting.

func ReconnectHandler added in v1.2.0

func ReconnectHandler(cb ConnHandler) Option

ReconnectHandler is an Option to set the reconnected handler.

func ReconnectJitter added in v1.10.0

func ReconnectJitter(jitter, jitterForTLS time.Duration) Option

ReconnectJitter is an Option to set the upper bound of a random delay added ReconnectWait.

func ReconnectWait added in v1.2.0

func ReconnectWait(t time.Duration) Option

ReconnectWait is an Option to set the wait time between reconnect attempts.

func RetryOnFailedConnect added in v1.11.0

func RetryOnFailedConnect(retry bool) Option

RetryOnFailedConnect sets the connection in reconnecting state right away if it can't connect to a server in the initial set. See RetryOnFailedConnect option for more details.

func RootCAs added in v1.2.0

func RootCAs(file ...string) Option

RootCAs is a helper option to provide the RootCAs pool from a list of filenames. If Secure is not already set this will set it as well.

func Secure added in v1.2.0

func Secure(tls ...*tls.Config) Option

Secure is an Option to enable TLS secure connections that skip server verification by default. Pass a TLS Configuration for proper TLS. NOTE: This should NOT be used in a production setting.

func SetCustomDialer added in v1.4.0

func SetCustomDialer(dialer CustomDialer) Option

SetCustomDialer is an Option to set a custom dialer which will be used when attempting to establish a connection. If both Dialer and CustomDialer are specified, CustomDialer takes precedence.

func SyncQueueLen added in v1.7.2

func SyncQueueLen(max int) Option

SyncQueueLen will set the maximum queue len for the internal channel used for SubscribeSync().

func Timeout added in v1.2.0

func Timeout(t time.Duration) Option

Timeout is an Option to set the timeout for Dial on a connection.

func Token added in v1.2.2

func Token(token string) Option

Token is an Option to set the token to use when a token is not included directly in the URLs and when a token handler is not provided.

func TokenHandler added in v1.7.0

func TokenHandler(cb AuthTokenHandler) Option

TokenHandler is an Option to set the token handler to use when a token is not included directly in the URLs and when a token is not set.

func UseOldRequestStyle added in v1.3.0

func UseOldRequestStyle() Option

UseOldRequestStyle is an Option to force usage of the old Request style.

func UserCredentials added in v1.7.0

func UserCredentials(userOrChainedFile string, seedFiles ...string) Option

UserCredentials is a convenience function that takes a filename for a user's JWT and a filename for the user's private Nkey seed.

func UserInfo added in v1.2.2

func UserInfo(user, password string) Option

UserInfo is an Option to set the username and password to use when not included directly in the URLs.

func UserJWT added in v1.7.0

func UserJWT(userCB UserJWTHandler, sigCB SignatureHandler) Option

UserJWT will set the callbacks to retrieve the user's JWT and the signature callback to sign the server nonce. This an the Nkey option are mutually exclusive.

type Options

type Options struct {

	// Url represents a single NATS server url to which the client
	// will be connecting. If the Servers option is also set, it
	// then becomes the first server in the Servers array.
	Url string

	// Servers is a configured set of servers which this client
	// will use when attempting to connect.
	Servers []string

	// NoRandomize configures whether we will randomize the
	// server pool.
	NoRandomize bool

	// NoEcho configures whether the server will echo back messages
	// that are sent on this connection if we also have matching subscriptions.
	// Note this is supported on servers >= version 1.2. Proto 1 or greater.
	NoEcho bool

	// Name is an optional name label which will be sent to the server
	// on CONNECT to identify the client.
	Name string

	// Verbose signals the server to send an OK ack for commands
	// successfully processed by the server.
	Verbose bool

	// Pedantic signals the server whether it should be doing further
	// validation of subjects.
	Pedantic bool

	// Secure enables TLS secure connections that skip server
	// verification by default. NOT RECOMMENDED.
	Secure bool

	// TLSConfig is a custom TLS configuration to use for secure
	// transports.
	TLSConfig *tls.Config

	// AllowReconnect enables reconnection logic to be used when we
	// encounter a disconnect from the current server.
	AllowReconnect bool

	// MaxReconnect sets the number of reconnect attempts that will be
	// tried before giving up. If negative, then it will never give up
	// trying to reconnect.
	MaxReconnect int

	// ReconnectWait sets the time to backoff after attempting a reconnect
	// to a server that we were already connected to previously.
	ReconnectWait time.Duration

	// CustomReconnectDelayCB is invoked after the library tried every
	// URL in the server list and failed to reconnect. It passes to the
	// user the current number of attempts. This function returns the
	// amount of time the library will sleep before attempting to reconnect
	// again. It is strongly recommended that this value contains some
	// jitter to prevent all connections to attempt reconnecting at the same time.
	CustomReconnectDelayCB ReconnectDelayHandler

	// ReconnectJitter sets the upper bound for a random delay added to
	// ReconnectWait during a reconnect when no TLS is used.
	// Note that any jitter is capped with ReconnectJitterMax.
	ReconnectJitter time.Duration

	// ReconnectJitterTLS sets the upper bound for a random delay added to
	// ReconnectWait during a reconnect when TLS is used.
	// Note that any jitter is capped with ReconnectJitterMax.
	ReconnectJitterTLS time.Duration

	// Timeout sets the timeout for a Dial operation on a connection.
	Timeout time.Duration

	// DrainTimeout sets the timeout for a Drain Operation to complete.
	DrainTimeout time.Duration

	// FlusherTimeout is the maximum time to wait for write operations
	// to the underlying connection to complete (including the flusher loop).
	FlusherTimeout time.Duration

	// PingInterval is the period at which the client will be sending ping
	// commands to the server, disabled if 0 or negative.
	PingInterval time.Duration

	// MaxPingsOut is the maximum number of pending ping commands that can
	// be awaiting a response before raising an ErrStaleConnection error.
	MaxPingsOut int

	// ClosedCB sets the closed handler that is called when a client will
	// no longer be connected.
	ClosedCB ConnHandler

	// DisconnectedCB sets the disconnected handler that is called
	// whenever the connection is disconnected.
	// Will not be called if DisconnectedErrCB is set
	// DEPRECATED. Use DisconnectedErrCB which passes error that caused
	// the disconnect event.
	DisconnectedCB ConnHandler

	// DisconnectedErrCB sets the disconnected error handler that is called
	// whenever the connection is disconnected.
	// Disconnected error could be nil, for instance when user explicitly closes the connection.
	// DisconnectedCB will not be called if DisconnectedErrCB is set
	DisconnectedErrCB ConnErrHandler

	// ReconnectedCB sets the reconnected handler called whenever
	// the connection is successfully reconnected.
	ReconnectedCB ConnHandler

	// DiscoveredServersCB sets the callback that is invoked whenever a new
	// server has joined the cluster.
	DiscoveredServersCB ConnHandler

	// AsyncErrorCB sets the async error handler (e.g. slow consumer errors)
	AsyncErrorCB ErrHandler

	// ReconnectBufSize is the size of the backing bufio during reconnect.
	// Once this has been exhausted publish operations will return an error.
	ReconnectBufSize int

	// SubChanLen is the size of the buffered channel used between the socket
	// Go routine and the message delivery for SyncSubscriptions.
	// NOTE: This does not affect AsyncSubscriptions which are
	// dictated by PendingLimits()
	SubChanLen int

	// UserJWT sets the callback handler that will fetch a user's JWT.
	UserJWT UserJWTHandler

	// Nkey sets the public nkey that will be used to authenticate
	// when connecting to the server. UserJWT and Nkey are mutually exclusive
	// and if defined, UserJWT will take precedence.
	Nkey string

	// SignatureCB designates the function used to sign the nonce
	// presented from the server.
	SignatureCB SignatureHandler

	// User sets the username to be used when connecting to the server.
	User string

	// Password sets the password to be used when connecting to a server.
	Password string

	// Token sets the token to be used when connecting to a server.
	Token string

	// TokenHandler designates the function used to generate the token to be used when connecting to a server.
	TokenHandler AuthTokenHandler

	// Dialer allows a custom net.Dialer when forming connections.
	// DEPRECATED: should use CustomDialer instead.
	Dialer *net.Dialer

	// CustomDialer allows to specify a custom dialer (not necessarily
	// a *net.Dialer).
	CustomDialer CustomDialer

	// UseOldRequestStyle forces the old method of Requests that utilize
	// a new Inbox and a new Subscription for each request.
	UseOldRequestStyle bool

	// NoCallbacksAfterClientClose allows preventing the invocation of
	// callbacks after Close() is called. Client won't receive notifications
	// when Close is invoked by user code. Default is to invoke the callbacks.
	NoCallbacksAfterClientClose bool

	// LameDuckModeHandler sets the callback to invoke when the server notifies
	// the connection that it entered lame duck mode, that is, going to
	// gradually disconnect all its connections before shuting down. This is
	// often used in deployments when upgrading NATS Servers.
	LameDuckModeHandler ConnHandler

	// RetryOnFailedConnect sets the connection in reconnecting state right
	// away if it can't connect to a server in the initial set. The
	// MaxReconnect and ReconnectWait options are used for this process,
	// similarly to when an established connection is disconnected.
	// If a ReconnectHandler is set, it will be invoked when the connection
	// is established, and if a ClosedHandler is set, it will be invoked if
	// it fails to connect (after exhausting the MaxReconnect attempts).
	RetryOnFailedConnect bool

	// For websocket connections, indicates to the server that the connection
	// supports compression. If the server does too, then data will be compressed.
	Compression bool

	// InboxPrefix allows the default _INBOX prefix to be customized
	InboxPrefix string
}

Options can be used to create a customized connection.

func GetDefaultOptions added in v1.3.0

func GetDefaultOptions() Options

GetDefaultOptions returns default configuration options for the client.

func (Options) Connect

func (o Options) Connect() (*Conn, error)

Connect will attempt to connect to a NATS server with multiple options.

type PeerInfo added in v1.11.0

type PeerInfo struct {
	Name    string        `json:"name"`
	Current bool          `json:"current"`
	Offline bool          `json:"offline,omitempty"`
	Active  time.Duration `json:"active"`
	Lag     uint64        `json:"lag,omitempty"`
}

PeerInfo shows information about all the peers in the cluster that are supporting the stream or consumer.

type Placement added in v1.11.0

type Placement struct {
	Cluster string   `json:"cluster"`
	Tags    []string `json:"tags,omitempty"`
}

Placement is used to guide placement of streams in clustered JetStream.

type PubAck added in v1.11.0

type PubAck struct {
	Stream    string `json:"stream"`
	Sequence  uint64 `json:"seq"`
	Duplicate bool   `json:"duplicate,omitempty"`
	Domain    string `json:"domain,omitempty"`
}

PubAck is an ack received after successfully publishing a message.

type PubAckFuture added in v1.11.0

type PubAckFuture interface {
	// Ok returns a receive only channel that can be used to get a PubAck.
	Ok() <-chan *PubAck

	// Err returns a receive only channel that can be used to get the error from an async publish.
	Err() <-chan error

	// Msg returns the message that was sent to the server.
	Msg() *Msg
}

PubAckFuture is a future for a PubAck.

type PubOpt added in v1.11.0

type PubOpt interface {
	// contains filtered or unexported methods
}

PubOpt configures options for publishing JetStream messages.

Example
nc, err := nats.Connect("localhost")
if err != nil {
	log.Fatal(err)
}

// Create JetStream context to produce/consumer messages that will be persisted.
js, err := nc.JetStream()
if err != nil {
	log.Fatal(err)
}

// Create stream to persist messages published on 'foo'.
js.AddStream(&nats.StreamConfig{
	Name:     "FOO",
	Subjects: []string{"foo"},
})

// Publish is synchronous by default, and waits for a PubAck response.
js.Publish("foo", []byte("Hello JS!"))

// Publish with a custom timeout.
js.Publish("foo", []byte("Hello JS!"), nats.AckWait(500*time.Millisecond))

// Publish with a context.
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

js.Publish("foo", []byte("Hello JS!"), nats.Context(ctx))

// Publish and assert the expected stream name.
js.Publish("foo", []byte("Hello JS!"), nats.ExpectStream("FOO"))

// Publish and assert the last sequence.
js.Publish("foo", []byte("Hello JS!"), nats.ExpectLastSequence(5))

// Publish and tag the message with an ID.
js.Publish("foo", []byte("Hello JS!"), nats.MsgId("foo:6"))

// Publish and assert the last msg ID.
js.Publish("foo", []byte("Hello JS!"), nats.ExpectLastMsgId("foo:6"))
Output:

func ExpectLastMsgId added in v1.11.0

func ExpectLastMsgId(id string) PubOpt

ExpectLastMsgId sets the expected last msgId in the response from the publish.

func ExpectLastSequence added in v1.11.0

func ExpectLastSequence(seq uint64) PubOpt

ExpectLastSequence sets the expected sequence in the response from the publish.

func ExpectLastSequencePerSubject added in v1.12.0

func ExpectLastSequencePerSubject(seq uint64) PubOpt

ExpectLastSequencePerSubject sets the expected sequence per subject in the response from the publish.

func ExpectStream added in v1.11.0

func ExpectStream(stream string) PubOpt

ExpectStream sets the expected stream to respond from the publish.

func MsgId added in v1.11.0

func MsgId(id string) PubOpt

MsgId sets the message ID used for de-duplication.

type PullOpt added in v1.11.0

type PullOpt interface {
	// contains filtered or unexported methods
}

PullOpt are the options that can be passed when pulling a batch of messages.

Example
nc, err := nats.Connect("localhost")
if err != nil {
	log.Fatal(err)
}

// Create JetStream context to produce/consumer messages that will be persisted.
js, err := nc.JetStream()
if err != nil {
	log.Fatal(err)
}

// Create stream to persist messages published on 'foo'.
js.AddStream(&nats.StreamConfig{
	Name:     "FOO",
	Subjects: []string{"foo"},
})

// Publish is synchronous by default, and waits for a PubAck response.
js.Publish("foo", []byte("Hello JS!"))

sub, _ := js.PullSubscribe("foo", "wq")

// Pull one message,
msgs, _ := sub.Fetch(1, nats.MaxWait(2*time.Second))
for _, msg := range msgs {
	msg.Ack()
}

// Using a context to timeout waiting for a message.
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

msgs, _ = sub.Fetch(1, nats.Context(ctx))
for _, msg := range msgs {
	msg.Ack()
}
Output:

type RawStreamMsg added in v1.11.0

type RawStreamMsg struct {
	Subject  string
	Sequence uint64
	Header   Header
	Data     []byte
	Time     time.Time
}

RawStreamMsg is a raw message stored in JetStream.

type ReconnectDelayHandler added in v1.10.0

type ReconnectDelayHandler func(attempts int) time.Duration

ReconnectDelayHandler is used to get from the user the desired delay the library should pause before attempting to reconnect again. Note that this is invoked after the library tried the whole list of URLs and failed to reconnect.

type ReplayPolicy added in v1.11.0

type ReplayPolicy int

ReplayPolicy determines how the consumer should replay messages it already has queued in the stream.

const (
	// ReplayInstantPolicy will replay messages as fast as possible.
	ReplayInstantPolicy ReplayPolicy = iota

	// ReplayOriginalPolicy will maintain the same timing as the messages were received.
	ReplayOriginalPolicy
)

func (ReplayPolicy) MarshalJSON added in v1.11.0

func (p ReplayPolicy) MarshalJSON() ([]byte, error)

func (*ReplayPolicy) UnmarshalJSON added in v1.11.0

func (p *ReplayPolicy) UnmarshalJSON(data []byte) error

type RetentionPolicy added in v1.11.0

type RetentionPolicy int

RetentionPolicy determines how messages in a set are retained.

const (
	// LimitsPolicy (default) means that messages are retained until any given limit is reached.
	// This could be one of MaxMsgs, MaxBytes, or MaxAge.
	LimitsPolicy RetentionPolicy = iota
	// InterestPolicy specifies that when all known observables have acknowledged a message it can be removed.
	InterestPolicy
	// WorkQueuePolicy specifies that when the first worker or subscriber acknowledges the message it can be removed.
	WorkQueuePolicy
)

func (RetentionPolicy) MarshalJSON added in v1.11.0

func (rp RetentionPolicy) MarshalJSON() ([]byte, error)

func (RetentionPolicy) String added in v1.11.0

func (rp RetentionPolicy) String() string

func (*RetentionPolicy) UnmarshalJSON added in v1.11.0

func (rp *RetentionPolicy) UnmarshalJSON(data []byte) error

type SequenceInfo added in v1.12.0

type SequenceInfo struct {
	Consumer uint64     `json:"consumer_seq"`
	Stream   uint64     `json:"stream_seq"`
	Last     *time.Time `json:"last_active,omitempty"`
}

SequenceInfo has both the consumer and the stream sequence and last activity.

type SequencePair added in v1.11.0

type SequencePair struct {
	Consumer uint64 `json:"consumer_seq"`
	Stream   uint64 `json:"stream_seq"`
}

SequencePair includes the consumer and stream sequence info from a JetStream consumer.

type SignatureHandler added in v1.7.0

type SignatureHandler func([]byte) ([]byte, error)

SignatureHandler is used to sign a nonce from the server while authenticating with nkeys. The user should sign the nonce and return the raw signature. The client will base64 encode this to send to the server.

type Statistics

type Statistics struct {
	InMsgs     uint64
	OutMsgs    uint64
	InBytes    uint64
	OutBytes   uint64
	Reconnects uint64
}

Tracks various stats received and sent on this connection, including counts for messages and bytes.

type Status

type Status int

Status represents the state of the connection.

func (Status) String added in v1.12.1

func (s Status) String() string

type StorageType added in v1.11.0

type StorageType int

StorageType determines how messages are stored for retention.

const (
	// FileStorage specifies on disk storage. It's the default.
	FileStorage StorageType = iota
	// MemoryStorage specifies in memory only.
	MemoryStorage
)

func (StorageType) MarshalJSON added in v1.11.0

func (st StorageType) MarshalJSON() ([]byte, error)

func (StorageType) String added in v1.11.0

func (st StorageType) String() string

func (*StorageType) UnmarshalJSON added in v1.11.0

func (st *StorageType) UnmarshalJSON(data []byte) error

type StreamConfig added in v1.11.0

type StreamConfig struct {
	Name              string          `json:"name"`
	Description       string          `json:"description,omitempty"`
	Subjects          []string        `json:"subjects,omitempty"`
	Retention         RetentionPolicy `json:"retention"`
	MaxConsumers      int             `json:"max_consumers"`
	MaxMsgs           int64           `json:"max_msgs"`
	MaxBytes          int64           `json:"max_bytes"`
	Discard           DiscardPolicy   `json:"discard"`
	MaxAge            time.Duration   `json:"max_age"`
	MaxMsgsPerSubject int64           `json:"max_msgs_per_subject"`
	MaxMsgSize        int32           `json:"max_msg_size,omitempty"`
	Storage           StorageType     `json:"storage"`
	Replicas          int             `json:"num_replicas"`
	NoAck             bool            `json:"no_ack,omitempty"`
	Template          string          `json:"template_owner,omitempty"`
	Duplicates        time.Duration   `json:"duplicate_window,omitempty"`
	Placement         *Placement      `json:"placement,omitempty"`
	Mirror            *StreamSource   `json:"mirror,omitempty"`
	Sources           []*StreamSource `json:"sources,omitempty"`
	Sealed            bool            `json:"sealed,omitempty"`
	DenyDelete        bool            `json:"deny_delete,omitempty"`
	DenyPurge         bool            `json:"deny_purge,omitempty"`
	AllowRollup       bool            `json:"allow_rollup_hdrs,omitempty"`
}

StreamConfig will determine the properties for a stream. There are sensible defaults for most. If no subjects are given the name will be used as the only subject.

type StreamInfo added in v1.11.0

type StreamInfo struct {
	Config  StreamConfig        `json:"config"`
	Created time.Time           `json:"created"`
	State   StreamState         `json:"state"`
	Cluster *ClusterInfo        `json:"cluster,omitempty"`
	Mirror  *StreamSourceInfo   `json:"mirror,omitempty"`
	Sources []*StreamSourceInfo `json:"sources,omitempty"`
}

StreamInfo shows config and current state for this stream.

type StreamSource added in v1.11.0

type StreamSource struct {
	Name          string          `json:"name"`
	OptStartSeq   uint64          `json:"opt_start_seq,omitempty"`
	OptStartTime  *time.Time      `json:"opt_start_time,omitempty"`
	FilterSubject string          `json:"filter_subject,omitempty"`
	External      *ExternalStream `json:"external,omitempty"`
}

StreamSource dictates how streams can source from other streams.

type StreamSourceInfo added in v1.11.0

type StreamSourceInfo struct {
	Name   string        `json:"name"`
	Lag    uint64        `json:"lag"`
	Active time.Duration `json:"active"`
}

StreamSourceInfo shows information about an upstream stream source.

type StreamState added in v1.11.0

type StreamState struct {
	Msgs      uint64    `json:"messages"`
	Bytes     uint64    `json:"bytes"`
	FirstSeq  uint64    `json:"first_seq"`
	FirstTime time.Time `json:"first_ts"`
	LastSeq   uint64    `json:"last_seq"`
	LastTime  time.Time `json:"last_ts"`
	Consumers int       `json:"consumer_count"`
}

StreamState is information about the given stream.

type SubOpt added in v1.11.0

type SubOpt interface {
	// contains filtered or unexported methods
}

SubOpt configures options for subscribing to JetStream consumers.

Example
nc, err := nats.Connect("localhost")
if err != nil {
	log.Fatal(err)
}

// Create JetStream context to produce/consumer messages that will be persisted.
js, err := nc.JetStream()
if err != nil {
	log.Fatal(err)
}

// Auto-ack each individual message.
js.Subscribe("foo", func(msg *nats.Msg) {
	fmt.Printf("Received a message: %s\n", string(msg.Data))
})

// Auto-ack current sequence and all below.
js.Subscribe("foo", func(msg *nats.Msg) {
	fmt.Printf("Received a message: %s\n", string(msg.Data))
}, nats.AckAll())

// Auto-ack each individual message.
js.Subscribe("foo", func(msg *nats.Msg) {
	fmt.Printf("Received a message: %s\n", string(msg.Data))
}, nats.AckExplicit())

// Acks are not required.
js.Subscribe("foo", func(msg *nats.Msg) {
	fmt.Printf("Received a message: %s\n", string(msg.Data))
}, nats.AckNone())

// Manually acknowledge messages.
js.Subscribe("foo", func(msg *nats.Msg) {
	msg.Ack()
}, nats.ManualAck())

// Bind to an existing stream.
sub, _ := js.SubscribeSync("origin", nats.BindStream("m1"))
msg, _ := sub.NextMsg(2 * time.Second)
msg.Ack()

// Deliver all messages from the beginning.
js.Subscribe("foo", func(msg *nats.Msg) {
	fmt.Printf("Received a message: %s\n", string(msg.Data))
}, nats.DeliverAll())

// Deliver messages starting from the last one.
js.Subscribe("foo", func(msg *nats.Msg) {
	fmt.Printf("Received a message: %s\n", string(msg.Data))
}, nats.DeliverLast())

// Deliver only new messages that arrive after subscription.
js.Subscribe("foo", func(msg *nats.Msg) {
	fmt.Printf("Received a message: %s\n", string(msg.Data))
}, nats.DeliverNew())

// Create durable consumer FOO, if it doesn't exist.
js.Subscribe("foo", func(msg *nats.Msg) {
	fmt.Printf("Received a message: %s\n", string(msg.Data))
}, nats.Durable("FOO"))

// Create consumer on Foo with flow control and heartbeats.
js.SubscribeSync("foo",
	// Redeliver after 30s
	nats.AckWait(30*time.Second),
	// Redeliver only once
	nats.MaxDeliver(1),
	// Activate Flow control algorithm from the server.
	nats.EnableFlowControl(),
	// Track heartbeats from the server fro missed sequences.
	nats.IdleHeartbeat(500*time.Millisecond),
)

// Set the allowable number of outstanding acks.
js.Subscribe("foo", func(msg *nats.Msg) {
	fmt.Printf("Received a message: %s\n", string(msg.Data))
}, nats.MaxAckPending(5))

// Set the number of redeliveries for a message.
js.Subscribe("foo", func(msg *nats.Msg) {
	fmt.Printf("Received a message: %s\n", string(msg.Data))
}, nats.MaxDeliver(5))

// Set the number the max inflight pull requests.
js.Subscribe("foo", func(msg *nats.Msg) {
	fmt.Printf("Received a message: %s\n", string(msg.Data))
}, nats.PullMaxWaiting(5))

// Set the number the max inflight pull requests.
js.Subscribe("foo", func(msg *nats.Msg) {
	fmt.Printf("Received a message: %s\n", string(msg.Data))
}, nats.PullMaxWaiting(5))

// Set the rate limit on a push consumer.
js.Subscribe("foo", func(msg *nats.Msg) {
	fmt.Printf("Received a message: %s\n", string(msg.Data))
}, nats.RateLimit(1024))

// Replay messages at original speed, instead of as fast as possible.
js.Subscribe("foo", func(msg *nats.Msg) {
	fmt.Printf("Received a message: %s\n", string(msg.Data))
}, nats.ReplayOriginal())

// Start delivering messages at a given sequence.
js.Subscribe("foo", func(msg *nats.Msg) {
	fmt.Printf("Received a message: %s\n", string(msg.Data))
}, nats.StartSequence(10))

// Start delivering messages at a given time.
js.Subscribe("foo", func(msg *nats.Msg) {
	fmt.Printf("Received a message: %s\n", string(msg.Data))
}, nats.StartTime(time.Now().Add(-2*time.Hour)))
Output:

func AckAll added in v1.11.0

func AckAll() SubOpt

AckAll when acking a sequence number, this implicitly acks all sequences below this one as well.

func AckExplicit added in v1.11.0

func AckExplicit() SubOpt

AckExplicit requires ack or nack for all messages.

func AckNone added in v1.11.0

func AckNone() SubOpt

AckNone requires no acks for delivered messages.

func Bind added in v1.12.0

func Bind(stream, consumer string) SubOpt

Bind binds a subscription to an existing consumer from a stream without attempting to create. The first argument is the stream name and the second argument will be the consumer name.

func BindStream added in v1.11.0

func BindStream(stream string) SubOpt

BindStream binds a consumer to a stream explicitly based on a name. When a stream name is not specified, the library uses the subscribe subject as a way to find the stream name. It is done by making a request to the server to get list of stream names that have a fileter for this subject. If the returned list contains a single stream, then this stream name will be used, otherwise the `ErrNoMatchingStream` is returned. To avoid the stream lookup, provide the stream name with this function. See also `Bind()`.

func DeliverAll added in v1.11.0

func DeliverAll() SubOpt

DeliverAll will configure a Consumer to receive all the messages from a Stream.

func DeliverLast added in v1.11.0

func DeliverLast() SubOpt

DeliverLast configures a Consumer to receive messages starting with the latest one.

func DeliverLastPerSubject added in v1.12.0

func DeliverLastPerSubject() SubOpt

DeliverLastPerSubject configures a Consumer to receive messages starting with the latest one for each filtered subject.

func DeliverNew added in v1.11.0

func DeliverNew() SubOpt

DeliverNew configures a Consumer to receive messages published after the subscription.

func DeliverSubject added in v1.12.0

func DeliverSubject(subject string) SubOpt

DeliverSubject specifies the JetStream consumer deliver subject.

This option is used only in situations where the consumer does not exist and a creation request is sent to the server. If not provided, an inbox will be selected. If a consumer exists, then the NATS subscription will be created on the JetStream consumer's DeliverSubject, not necessarily this subject.

func Description added in v1.12.0

func Description(description string) SubOpt

Description will set the description for the created consumer.

func Durable added in v1.11.0

func Durable(consumer string) SubOpt

Durable defines the consumer name for JetStream durable subscribers. This function will return ErrInvalidDurableName in the name contains any dot ".".

func EnableFlowControl added in v1.11.0

func EnableFlowControl() SubOpt

EnableFlowControl enables flow control for a push based consumer.

func HeadersOnly added in v1.13.0

func HeadersOnly() SubOpt

HeadersOnly() will instruct the consumer to only deleiver headers and no payloads.

func IdleHeartbeat added in v1.11.0

func IdleHeartbeat(duration time.Duration) SubOpt

IdleHeartbeat enables push based consumers to have idle heartbeats delivered.

func ManualAck added in v1.11.0

func ManualAck() SubOpt

ManualAck disables auto ack functionality for async subscriptions.

func MaxAckPending added in v1.11.0

func MaxAckPending(n int) SubOpt

MaxAckPending sets the number of outstanding acks that are allowed before message delivery is halted.

func MaxDeliver added in v1.11.0

func MaxDeliver(n int) SubOpt

MaxDeliver sets the number of redeliveries for a message.

func OrderedConsumer added in v1.12.0

func OrderedConsumer() SubOpt

OrderedConsumer will create a fifo direct/ephemeral consumer for in order delivery of messages. There are no redeliveries and no acks, and flow control and heartbeats will be added but will be taken care of without additional client code.

func PullMaxWaiting added in v1.11.0

func PullMaxWaiting(n int) SubOpt

PullMaxWaiting defines the max inflight pull requests.

func RateLimit added in v1.11.0

func RateLimit(n uint64) SubOpt

RateLimit is the Bits per sec rate limit applied to a push consumer.

func ReplayInstant added in v1.12.0

func ReplayInstant() SubOpt

ReplayInstant replays the messages as fast as possible.

func ReplayOriginal added in v1.11.0

func ReplayOriginal() SubOpt

ReplayOriginal replays the messages at the original speed.

func StartSequence added in v1.11.0

func StartSequence(seq uint64) SubOpt

StartSequence configures a Consumer to receive messages from a start sequence.

func StartTime added in v1.11.0

func StartTime(startTime time.Time) SubOpt

StartTime configures a Consumer to receive messages from a start time.

type Subscription

type Subscription struct {

	// Subject that represents this subscription. This can be different
	// than the received subject inside a Msg if this is a wildcard.
	Subject string

	// Optional queue group name. If present, all subscriptions with the
	// same name will form a distributed queue, and each message will
	// only be processed by one member of the group.
	Queue string
	// contains filtered or unexported fields
}

Subscription represents interest in a given subject.

func (*Subscription) AutoUnsubscribe

func (s *Subscription) AutoUnsubscribe(max int) error

AutoUnsubscribe will issue an automatic Unsubscribe that is processed by the server when max messages have been received. This can be useful when sending a request to an unknown number of subscribers.

Example
nc, _ := nats.Connect(nats.DefaultURL)
defer nc.Close()

received, wanted, total := 0, 10, 100

sub, _ := nc.Subscribe("foo", func(_ *nats.Msg) {
	received++
})
sub.AutoUnsubscribe(wanted)

for i := 0; i < total; i++ {
	nc.Publish("foo", []byte("Hello"))
}
nc.Flush()

fmt.Printf("Received = %d", received)
Output:

func (*Subscription) ClearMaxPending added in v1.2.0

func (s *Subscription) ClearMaxPending() error

ClearMaxPending resets the maximums seen so far.

func (*Subscription) ConsumerInfo added in v1.11.0

func (sub *Subscription) ConsumerInfo() (*ConsumerInfo, error)

func (*Subscription) Delivered added in v1.2.0

func (s *Subscription) Delivered() (int64, error)

Delivered returns the number of delivered messages for this subscription.

func (*Subscription) Drain added in v1.6.0

func (s *Subscription) Drain() error

Drain will remove interest but continue callbacks until all messages have been processed.

For a JetStream subscription, if the library has created the JetStream consumer, the library will send a DeleteConsumer request to the server when the Drain operation completes. If a failure occurs when deleting the JetStream consumer, an error will be reported to the asynchronous error callback. If you do not wish the JetStream consumer to be automatically deleted, ensure that the consumer is not created by the library, which means create the consumer with AddConsumer and bind to this consumer.

func (*Subscription) Dropped added in v1.2.0

func (s *Subscription) Dropped() (int, error)

Dropped returns the number of known dropped messages for this subscription. This will correspond to messages dropped by violations of PendingLimits. If the server declares the connection a SlowConsumer, this number may not be valid.

func (*Subscription) Fetch added in v1.11.0

func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error)

Fetch pulls a batch of messages from a stream for a pull consumer.

func (*Subscription) IsValid

func (s *Subscription) IsValid() bool

IsValid returns a boolean indicating whether the subscription is still active. This will return false if the subscription has already been closed.

func (*Subscription) MaxPending added in v1.2.0

func (s *Subscription) MaxPending() (int, int, error)

MaxPending returns the maximum number of queued messages and queued bytes seen so far.

func (*Subscription) NextMsg

func (s *Subscription) NextMsg(timeout time.Duration) (*Msg, error)

NextMsg will return the next message available to a synchronous subscriber or block until one is available. An error is returned if the subscription is invalid (ErrBadSubscription), the connection is closed (ErrConnectionClosed), the timeout is reached (ErrTimeout), or if there were no responders (ErrNoResponders) when used in the context of a request/reply.

Example
nc, _ := nats.Connect(nats.DefaultURL)
defer nc.Close()

sub, _ := nc.SubscribeSync("foo")
m, err := sub.NextMsg(1 * time.Second)
if err == nil {
	fmt.Printf("Received a message: %s\n", string(m.Data))
} else {
	fmt.Println("NextMsg timed out.")
}
Output:

func (*Subscription) NextMsgWithContext added in v1.3.0

func (s *Subscription) NextMsgWithContext(ctx context.Context) (*Msg, error)

NextMsgWithContext takes a context and returns the next message available to a synchronous subscriber, blocking until it is delivered or context gets canceled.

func (*Subscription) Pending added in v1.2.0

func (s *Subscription) Pending() (int, int, error)

Pending returns the number of queued messages and queued bytes in the client for this subscription.

func (*Subscription) PendingLimits added in v1.2.0

func (s *Subscription) PendingLimits() (int, int, error)

PendingLimits returns the current limits for this subscription. If no error is returned, a negative value indicates that the given metric is not limited.

func (*Subscription) QueuedMsgs added in v1.1.2

func (s *Subscription) QueuedMsgs() (int, error)

Queued returns the number of queued messages in the client for this subscription. DEPRECATED: Use Pending()

func (*Subscription) SetPendingLimits added in v1.2.0

func (s *Subscription) SetPendingLimits(msgLimit, bytesLimit int) error

SetPendingLimits sets the limits for pending msgs and bytes for this subscription. Zero is not allowed. Any negative value means that the given metric is not limited.

func (*Subscription) Type added in v1.2.0

func (s *Subscription) Type() SubscriptionType

Type returns the type of Subscription.

func (*Subscription) Unsubscribe

func (s *Subscription) Unsubscribe() error

Unsubscribe will remove interest in the given subject.

For a JetStream subscription, if the library has created the JetStream consumer, it will send a DeleteConsumer request to the server (if the unsubscribe itself was successful). If the delete operation fails, the error will be returned. If you do not wish the JetStream consumer to be automatically deleted, ensure that the consumer is not created by the library, which means create the consumer with AddConsumer and bind to this consumer (using the nats.Bind() option).

Example
nc, _ := nats.Connect(nats.DefaultURL)
defer nc.Close()

sub, _ := nc.SubscribeSync("foo")
// ...
sub.Unsubscribe()
Output:

type SubscriptionType added in v1.2.0

type SubscriptionType int

SubscriptionType is the type of the Subscription.

type UserJWTHandler added in v1.7.0

type UserJWTHandler func() (string, error)

UserJWTHandler is used to fetch and return the account signed JWT for this user.

type WatchOpt added in v1.13.0

type WatchOpt interface {
	// contains filtered or unexported methods
}

func IgnoreDeletes added in v1.13.0

func IgnoreDeletes() WatchOpt

IgnoreDeletes will have the key watcher not pass any deleted keys.

func IncludeHistory added in v1.13.0

func IncludeHistory() WatchOpt

IncludeHistory instructs the key watcher to include historical values as well.

Directories

Path Synopsis
encoders module
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL