natspubsub

package module
v0.7.12 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 4, 2026 License: Apache-2.0 Imports: 17 Imported by: 2

README

natspubsub

A go-cloud pubsub plugin for nats io supporting jetstream as well.

Tests

Package natspubsub provides a pubsub implementation for NATS.io and Jetstream. Use OpenTopic to construct a *pubsub.Topic, and/or OpenSubscription to construct a *pubsub.Subscription. This package uses gob to encode and decode driver.Message to []byte.

Usage (publishing a message)


    // opening a topic looks like this
	package main 
    import (
        "context"
    
        "gocloud.dev/pubsub"
        _ "github.com/pitabwire/natspubsub"
    )
	...
	ctx := context.Background()
    // pubsub.OpenTopic creates a *pubsub.Topic from a URL.
    // This URL will Dial the NATS server at the URL in the environment variable
    // NATS_SERVER_URL and send messages with subject "example.mysubject".
    topic, err := pubsub.OpenTopic(ctx, "nats://localhost:4222?jetstream=true&stream_name=example&subject=example.mysubject")
    if err != nil {
        return err
    }
	defer topic.Shutdown(ctx)
	
    err := topic.Send(ctx, &pubsub.Message{
        Body: []byte("Hello, World!\n"),
        Metadata: map[string]string{
            "language":   "en",
            "importance": "high",
        },
    })
    if err != nil {
    return err
    }
    
    

(receiving messages)

package main
import (
    "context"

    "gocloud.dev/pubsub"
    _ "github.com/pitabwire/natspubsub"
)
...
ctx := context.Background()
subs, err := pubsub.OpenSubscription(ctx, "nats://localhost:4222?jetstream=true&stream_name=example&subject=example.mysubject")
if err != nil {
    return fmt.Errorf("could not open topic subscription: %v", err)
}

// Loop on received messages.
for {
    msg, err := subs.Receive(ctx)
    if err != nil {
        // Errors from Receive indicate that Receive will no longer succeed.
        log.Printf("Receiving message: %v", err)
        break
    }
    // Do work based on the message, for example:
    fmt.Printf("Got message: %q\n", msg.Body)
    // Messages must always be acknowledged with Ack.
    msg.Ack()
}

defer subs.Shutdown(ctx)


URLs

For pubsub.OpenTopic and pubsub.OpenSubscription, natspubsub registers for the scheme "nats".

The default URL opener will connect to a default server based on the environment variable "NATS_SERVER_URL".

Authentication can be configured using the environment variable "NATS_CREDENTIALS_FILE". If this variable is set to a path to a NATS credentials file, the client will automatically use those credentials for authentication when connecting to the NATS server.

This implementation supports all versions of NATS Servers, we however recommend version 2.2.0 or later. Messages can be encoded using native NATS message headers, and native message content providing full support for non-Go clients. Operating in this manner is more efficient than using gob.Encoder. Using older versions of the server will result in errors and untested scenarios.

In the event the user cannot upgrade the nats server, we recommend the use of the in tree implementation instead: https://github.com/google/go-cloud/tree/master/pubsub/natspubsub

To customize the URL opener, or for more details on the URL format, see URLOpener. See https://gocloud.dev/concepts/urls/ for background information.

Example URLs:

Topic open - nats://127.0.0.1:4222?jetstream=true&stream_name=testq&stream_retention=workqueue&stream_storage=memory&stream_subjects=testq.*&subject=testq

Topic subscription - nats://127.0.0.1:4222?consumer_ack_policy=explicit&consumer_ack_wait=10s&consumer_deliver_policy=all&consumer_durable_name=durabletest&consumer_filter_subject=testq.*&jetstream=true&stream_name=testq&stream_retention=workqueue&stream_storage=memory&stream_subjects=testq.*

This driver also has a hidden super power to make dynamic publishes from the header content supplied. One just needs to set the header that should be checked to extend the existing topic subject during publishing:

  • header_to_extended_subject

Message Delivery Semantics

NATS supports : See https://godoc.org/gocloud.dev/pubsub#hdr-At_most_once_and_At_least_once_Delivery for more background.

  1. at-most-once-semantics; applications need not call Message.Ack, applications must also not call Message.Nack.

  2. at-least-once-semantics; this mode is possible with jetstream enabled. applications are supposed to call Message.Ack to remove processed messages from the stream. Enabling this the jetstream mode requires adding the jetstream parameter in the url query.

Subscription options

All the subscription options are passed in as query parameters to the nats url supplied.

Comprehensive definitions of the options can be found below. These are based on the official NATS JetStream configuration from:

Basic Options

Option Default value Description
jetstream true Enables JetStream functionality for at-least-once delivery semantics
subject A string of characters that form a name which the publisher and subscriber can use to find each other
receive_wait_timeout 5m A timeout for the receive operation. If set to 0, the receive operation will block indefinitely until context ends.

Stream Configuration Parameters

Stream config options are prefixed by "stream_" in URL query parameters.

Parameter Type Default Description
stream_name string (required) The unique name of the stream
stream_description string Optional description of the stream
stream_subjects []string List of subjects that the stream is listening on (supports wildcards)
stream_retention string limits Message retention policy: limits, interest, or workqueue
stream_max_consumers int -1 Maximum number of consumers allowed for the stream (-1 for unlimited)
stream_max_msgs int64 -1 Maximum number of messages the stream will store (-1 for unlimited)
stream_max_bytes int64 -1 Maximum total size of messages the stream will store (-1 for unlimited)
stream_discard string old Policy for handling messages when limits are reached: old or new
stream_discard_new_per_subject bool false Whether to discard new messages per subject when limits are reached
stream_max_age duration Maximum age of messages that the stream will retain (e.g., "24h")
stream_max_msgs_per_subject int64 -1 Maximum number of messages per subject that the stream will retain (-1 for unlimited)
stream_max_msg_size int32 -1 Maximum size of any single message in the stream in bytes (-1 for unlimited)
stream_storage string file Storage backend type: file or memory
stream_num_replicas int 1 Number of replicas to maintain in clustered JetStream (1-5)
stream_no_ack bool false Flag to disable acknowledging messages received by this stream
stream_duplicate_window duration 2m Duration within which to track duplicate messages
stream_placement object Placement constraints for stream (cluster name and/or tags)
stream_mirror object Configuration for mirroring another stream
stream_sources []object List of streams this stream sources messages from
stream_sealed bool false Flag to mark stream as sealed (no new messages or deletion)
stream_deny_delete bool false Flag to restrict ability to delete messages via API
stream_deny_purge bool false Flag to restrict ability to purge messages via API
stream_allow_rollup_hdrs bool false Flag to allow the Nats-Rollup header for stream content replacement
stream_compression string Message storage compression algorithm: none, s2, or gzip
stream_first_seq uint64 1 Initial sequence number for the first message in the stream
stream_subject_transform object Configuration for transforming subjects on messages
stream_republish object Configuration for republishing messages after storing
stream_allow_direct bool false Flag to enable direct access to individual messages
stream_mirror_direct bool false Flag to enable direct access to messages from origin stream
stream_consumer_limits object Limits for consumers on this stream
stream_metadata map User-defined metadata key-value pairs for the stream
stream_allow_msg_ttl bool false Flag to allow header-initiated per-message TTLs
stream_subject_delete_marker_ttl duration Duration for keeping server markers for deletions

Consumer Configuration Parameters

Consumer config options are prefixed by "consumer_" in URL query parameters.

Parameter Type Default Description
consumer_name string (auto) Name for the consumer (auto-generated if not provided)
consumer_durable_name string Name for durable subscription (persists across reconnects)
consumer_description string Optional description of the consumer
consumer_deliver_policy string all Where to start delivering messages: all, last, new, by_start_sequence, by_start_time
consumer_opt_start_seq uint64 Starting sequence number when deliver policy is by_start_sequence
consumer_opt_start_time time Starting time when deliver policy is by_start_time (RFC3339 format)
consumer_ack_policy string explicit How messages are acknowledged: none, all, explicit
consumer_ack_wait duration 30s How long server waits for acknowledgement before redelivery
consumer_max_deliver int -1 Maximum delivery attempts for a message (-1 for unlimited)
consumer_backoff []duration Back-off intervals for retrying message delivery (comma separated)
consumer_filter_subject string Subject filter to select messages from stream
consumer_filter_subjects []string Multiple subject filters to select messages from stream
consumer_replay_policy string instant Rate at which messages are sent: instant or original
consumer_rate_limit_bps uint64 Maximum rate of message delivery in bits per second
consumer_sample_freq string Frequency for sampling acknowledgements (e.g., "100%")
consumer_max_waiting int 512 Maximum number of pull requests waiting to be fulfilled
consumer_max_ack_pending int 1000 Maximum number of unacknowledged messages (-1 for unlimited)
consumer_headers_only bool false Flag to deliver only message headers without payload
consumer_max_batch int Maximum batch size for a single pull request
consumer_max_expires duration Maximum duration a pull request will wait for messages
consumer_max_bytes int Maximum total bytes that can be requested in a batch
consumer_inactive_threshold duration 5s Duration after which an inactive consumer is cleaned up
consumer_num_replicas int Number of replicas for the consumer state (inherits from stream by default)
consumer_mem_storage bool false Flag to force consumer to use memory storage
consumer_metadata map User-defined metadata key-value pairs for the consumer
consumer_pause_until time Timestamp until which the consumer is paused (RFC3339 format)
consumer_priority_policy string Priority policy for the consumer: standard or pinned
consumer_priority_timeout duration Time after which client will be unpinned if inactive
consumer_priority_groups []string List of priority groups this consumer supports

Batch Receive Parameters

Batch receive options control how messages are batched when receiving.

Parameter Type Default Description
receive_batch_max_handlers int 10 Maximum number of handlers for processing batch receive operations
receive_batch_min_batch_size int 1 Minimum number of messages in a batch for batch receive
receive_batch_max_batch_size int 100 Maximum number of messages in a batch for batch receive
receive_batch_max_batch_byte_size int 1048576 Maximum total byte size of messages in a batch (1MB default)

Batch Ack Parameters

Batch ack options control how message acknowledgments are batched.

Parameter Type Default Description
ack_batch_max_handlers int 10 Maximum number of handlers for processing batch acknowledgement operations
ack_batch_min_batch_size int 1 Minimum number of messages in a batch for batch acknowledgement
ack_batch_max_batch_size int 100 Maximum number of messages in a batch for batch acknowledgement
ack_batch_max_batch_byte_size int 1048576 Maximum total byte size of messages in a batch (1MB default)

Publishing options

Option Default value Description
Subject An identifier that publishers send messages to subscribers of interest

Documentation

Overview

Example (OpenSubscriptionFromURL)
package main

import (
	"context"
	"log"

	"gocloud.dev/pubsub"
)

func main() {
	// PRAGMA: This example is used on gocloud.dev; PRAGMA comments adjust how it is shown and can be ignored.
	// PRAGMA: On gocloud.dev, add a blank import: _ "gocloud.dev/pubsub/natspubsub"
	// PRAGMA: On gocloud.dev, hide lines until the next blank line.
	ctx := context.Background()

	// pubsub.OpenSubscription creates a *pubsub.Subscription from a URL.
	// This URL will Dial the NATS server at the URL in the environment variable
	// NATS_SERVER_URL and receive messages with subject "example.mysubject".
	// This URL will be parsed and the natsv2 attribute will be used to
	// use NATS v2.2.0+ native message headers as the message metadata.
	subscription, err := pubsub.OpenSubscription(ctx, "nats://nats.example.com/example.mysubject?jetstream=true")
	if err != nil {
		log.Fatal(err)
	}
	defer func(subscription *pubsub.Subscription, ctx context.Context) {
		_ = subscription.Shutdown(ctx)
	}(subscription, ctx)
}
Example (OpenTopicFromURL)
package main

import (
	"context"
	"log"

	"gocloud.dev/pubsub"
)

func main() {
	// PRAGMA: This example is used on gocloud.dev; PRAGMA comments adjust how it is shown and can be ignored.
	// PRAGMA: On gocloud.dev, add a blank import: _ "gocloud.dev/pubsub/natspubsub"
	// PRAGMA: On gocloud.dev, hide lines until the next blank line.
	ctx := context.Background()

	// pubsub.OpenTopic creates a *pubsub.Conn from a URL.
	// This URL will Dial the NATS server at the URL in the environment variable
	// NATS_SERVER_URL and send messages with subject "example.mysubject".
	// This URL will be parsed and the natsv2 attribute will be used to
	// use NATS v2.2.0+ native message headers as the message metadata.
	topic, err := pubsub.OpenTopic(ctx, "nats://nats.example.com/example.mysubject")
	if err != nil {
		log.Fatal(err)
	}
	defer func(topic *pubsub.Topic, ctx context.Context) {
		_ = topic.Shutdown(ctx)
	}(topic, ctx)
}

Index

Examples

Constants

View Source
const (
	QueryParameterNatsV1    = "nats_v1"
	QueryParameterJetstream = "jetstream"

	QueryParamSubject               = "subject"
	QueryParamHeaderToExtendSubject = "header_to_extended_subject"
	QueryParamReceiveWaitTimeout    = "receive_wait_timeout"
	DefaultReceiveWaitTimeout       = 5 * time.Minute

	ReceiveBatchConfigPrefix = "receive_batch_"
	AckBatchConfigPrefix     = "ack_batch_"
	ConsumerConfigPrefix     = "consumer_"
	StreamConfigPrefix       = "stream_"
)
View Source
const Scheme = "nats"

Scheme is the URL scheme natspubsub registers its URLOpeners under on pubsub.DefaultMux.

Variables

View Source
var BatchAckURIParameters = []string{"ack_batch_max_handlers", "ack_batch_min_batch_size", "ack_batch_max_batch_size", "ack_batch_max_batch_byte_size"}
View Source
var BatchReceiveURIParameters = []string{
	"receive_batch_max_handlers", "receive_batch_min_batch_size", "receive_batch_max_batch_size", "receive_batch_max_batch_byte_size",
}
View Source
var ConsumerURIParameters = []string{
	"consumer_name", "consumer_durable_name", "consumer_description", "consumer_deliver_policy",
	"consumer_opt_start_seq", "consumer_opt_start_time", "consumer_ack_policy", "consumer_ack_wait",
	"consumer_max_deliver", "consumer_backoff", "consumer_filter_subject", "consumer_replay_policy",
	"consumer_rate_limit_bps", "consumer_sample_freq", "consumer_max_waiting", "consumer_max_ack_pending",
	"consumer_headers_only", "consumer_max_batch", "consumer_max_expires", "consumer_max_bytes",
	"consumer_inactive_threshold", "consumer_num_replicas", "consumer_mem_storage", "consumer_filter_subjects",
	"consumer_metadata", "consumer_pause_until", "consumer_priority_policy", "consumer_priority_timeout",
	"consumer_priority_groups",
}
View Source
var StreamURIParameters = []string{"stream_name", "stream_description", "stream_subjects", "stream_retention", "stream_max_consumers",
	"stream_max_msgs", "stream_max_bytes", "stream_discard", "stream_discard_new_per_subject", "stream_max_age",
	"stream_max_msgs_per_subject", "stream_max_msg_size", "stream_storage", "stream_num_replicas", "stream_no_ack",
	"stream_duplicate_window", "stream_placement", "stream_mirror", "stream_sources", "stream_sealed",
	"stream_deny_delete", "stream_deny_purge", "stream_allow_rollup_hdrs", "stream_compression", "stream_first_seq",
	"stream_subject_transform", "stream_republish", "stream_allow_direct", "stream_mirror_direct", "stream_consumer_limits",
	"stream_metadata", "stream_template_owner", "stream_allow_msg_ttl", "stream_subject_delete_marker_ttl",
}

Functions

func OpenSubscription

func OpenSubscription(ctx context.Context, connector connections.Connector, opts *connections.SubscriptionOptions) (*pubsub.Subscription, error)

OpenSubscription returns a *pubsub.Subscription representing a NATS subscription or NATS queue subscription for use with NATS at least version 2.2.0. This changes the encoding of the message as, starting with version 2.2.0, NATS supports message headers. In previous versions the message headers were encoded along with the message content using gob.Encoder, which limits the subscribers only to Go clients. This implementation uses native NATS message headers, and native message content, which provides full support for non-Go clients.

Example
// PRAGMA: This example is used on gocloud.dev; PRAGMA comments adjust how it is shown and can be ignored.
// PRAGMA: On gocloud.dev, add a blank import: _ "gocloud.dev/pubsub/natspubsub"
// PRAGMA: On gocloud.dev, hide lines until the next blank line.
ctx := context.Background()
natsConn, err := nats.Connect("nats://nats.example.com")
if err != nil {
	log.Fatal(err)
}
defer natsConn.Close()

conn, err := connections.NewPlain(natsConn)
if err != nil {
	log.Fatal(err)
}

subscription, err := natspubsub.OpenSubscription(
	ctx, connections.WrapConnection(conn), &connections.SubscriptionOptions{Subject: "example.mysubject"})
if err != nil {
	log.Fatal(err)
}
defer func(subscription *pubsub.Subscription, ctx context.Context) {
	_ = subscription.Shutdown(ctx)
}(subscription, ctx)

func OpenTopic

func OpenTopic(ctx context.Context, connector connections.Connector, opts *connections.TopicOptions) (*pubsub.Topic, error)

OpenTopic returns a *pubsub.Topic for use with NATS, version 2.2.0 and above is recommended. This changes the encoding of the message as, starting with version 2.2.0, NATS supports message headers. In previous versions the message headers were encoded along with the message content using gob.Encoder, which limits the subscribers only to Go clients. This implementation uses native NATS message headers, and native message content, which provides full support for non-Go clients.

Example
// PRAGMA: This example is used on gocloud.dev; PRAGMA comments adjust how it is shown and can be ignored.
// PRAGMA: On gocloud.dev, add a blank import: _ "gocloud.dev/pubsub/natspubsub"
// PRAGMA: On gocloud.dev, hide lines until the next blank line.
ctx := context.Background()

natsConn, err := nats.Connect("nats://nats.example.com")
if err != nil {
	log.Fatal(err)
}
defer natsConn.Close()

conn, err := connections.NewJetstream(natsConn)
if err != nil {
	log.Fatal(err)
}

topic, err := natspubsub.OpenTopic(ctx, connections.WrapConnection(conn), &connections.TopicOptions{Subject: "example.mysubject"})
if err != nil {
	log.Fatal(err)
}
defer func(topic *pubsub.Topic, ctx context.Context) {
	_ = topic.Shutdown(ctx)
}(topic, ctx)

Types

type URLOpener

type URLOpener struct {
	Conn connections.Connection
	// TopicOptions specifies the options to pass to OpenTopic.
	TopicOptions connections.TopicOptions
	// SubscriptionOptions specifies the options to pass to OpenSubscription.
	SubscriptionOptions connections.SubscriptionOptions
	// contains filtered or unexported fields
}

URLOpener opens NATS URLs like "nats://mysubject".

The URL host+path is used as the subject.

No query parameters are supported.

func (*URLOpener) ConfirmClose added in v0.6.1

func (o *URLOpener) ConfirmClose() error

ConfirmClose decrements the reference count and returns the new count

func (*URLOpener) ConfirmOpen added in v0.6.1

func (o *URLOpener) ConfirmOpen() int32

ConfirmOpen increments the reference count of topics and subscriptions opened

func (*URLOpener) Connection

func (o *URLOpener) Connection() connections.Connection

Connection increments the reference count of topics and subscriptions opened

func (*URLOpener) OpenSubscriptionURL

func (o *URLOpener) OpenSubscriptionURL(ctx context.Context, u *url.URL) (*pubsub.Subscription, error)

func (*URLOpener) OpenTopicURL

func (o *URLOpener) OpenTopicURL(ctx context.Context, u *url.URL) (*pubsub.Topic, error)

OpenTopicURL opens a pubsub.Topic based on a url supplied.

A topic can be specified in the subject and suffixed by the url path
These definitions will yield the subject shown infront of them

	- nats://host:8934?subject=foo --> foo
	- nats://host:8934/bar?subject=foo --> foo/bar
	- nats://host:8934/bar --> /bar
	- nats://host:8934?no_subject=foo --> [this yields an error]

Directories

Path Synopsis
Package errors provides error handling for the natspubsub package.
Package errors provides error handling for the natspubsub package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL