Documentation
¶
Overview ¶
Example (OpenSubscriptionFromURL) ¶
package main
import (
"context"
"log"
"gocloud.dev/pubsub"
)
func main() {
// PRAGMA: This example is used on gocloud.dev; PRAGMA comments adjust how it is shown and can be ignored.
// PRAGMA: On gocloud.dev, add a blank import: _ "gocloud.dev/pubsub/natspubsub"
// PRAGMA: On gocloud.dev, hide lines until the next blank line.
ctx := context.Background()
// pubsub.OpenSubscription creates a *pubsub.Subscription from a URL.
// This URL will Dial the NATS server at the URL in the environment variable
// NATS_SERVER_URL and receive messages with subject "example.mysubject".
// This URL will be parsed and the natsv2 attribute will be used to
// use NATS v2.2.0+ native message headers as the message metadata.
subscription, err := pubsub.OpenSubscription(ctx, "nats://nats.example.com/example.mysubject?jetstream=true")
if err != nil {
log.Fatal(err)
}
defer func(subscription *pubsub.Subscription, ctx context.Context) {
_ = subscription.Shutdown(ctx)
}(subscription, ctx)
}
Example (OpenTopicFromURL) ¶
package main
import (
"context"
"log"
"gocloud.dev/pubsub"
)
func main() {
// PRAGMA: This example is used on gocloud.dev; PRAGMA comments adjust how it is shown and can be ignored.
// PRAGMA: On gocloud.dev, add a blank import: _ "gocloud.dev/pubsub/natspubsub"
// PRAGMA: On gocloud.dev, hide lines until the next blank line.
ctx := context.Background()
// pubsub.OpenTopic creates a *pubsub.Conn from a URL.
// This URL will Dial the NATS server at the URL in the environment variable
// NATS_SERVER_URL and send messages with subject "example.mysubject".
// This URL will be parsed and the natsv2 attribute will be used to
// use NATS v2.2.0+ native message headers as the message metadata.
topic, err := pubsub.OpenTopic(ctx, "nats://nats.example.com/example.mysubject")
if err != nil {
log.Fatal(err)
}
defer func(topic *pubsub.Topic, ctx context.Context) {
_ = topic.Shutdown(ctx)
}(topic, ctx)
}
Index ¶
- Constants
- Variables
- func OpenSubscription(ctx context.Context, connector connections.Connector, ...) (*pubsub.Subscription, error)
- func OpenTopic(ctx context.Context, connector connections.Connector, ...) (*pubsub.Topic, error)
- type URLOpener
- func (o *URLOpener) ConfirmClose() error
- func (o *URLOpener) ConfirmOpen() int32
- func (o *URLOpener) Connection() connections.Connection
- func (o *URLOpener) OpenSubscriptionURL(ctx context.Context, u *url.URL) (*pubsub.Subscription, error)
- func (o *URLOpener) OpenTopicURL(ctx context.Context, u *url.URL) (*pubsub.Topic, error)
Examples ¶
Constants ¶
const ( QueryParameterNatsV1 = "nats_v1" QueryParameterJetstream = "jetstream" QueryParamSubject = "subject" QueryParamHeaderToExtendSubject = "header_to_extended_subject" QueryParamReceiveWaitTimeout = "receive_wait_timeout" DefaultReceiveWaitTimeout = 5 * time.Minute ReceiveBatchConfigPrefix = "receive_batch_" AckBatchConfigPrefix = "ack_batch_" ConsumerConfigPrefix = "consumer_" StreamConfigPrefix = "stream_" )
const Scheme = "nats"
Scheme is the URL scheme natspubsub registers its URLOpeners under on pubsub.DefaultMux.
Variables ¶
var BatchAckURIParameters = []string{"ack_batch_max_handlers", "ack_batch_min_batch_size", "ack_batch_max_batch_size", "ack_batch_max_batch_byte_size"}
var BatchReceiveURIParameters = []string{
"receive_batch_max_handlers", "receive_batch_min_batch_size", "receive_batch_max_batch_size", "receive_batch_max_batch_byte_size",
}
var ConsumerURIParameters = []string{
"consumer_name", "consumer_durable_name", "consumer_description", "consumer_deliver_policy",
"consumer_opt_start_seq", "consumer_opt_start_time", "consumer_ack_policy", "consumer_ack_wait",
"consumer_max_deliver", "consumer_backoff", "consumer_filter_subject", "consumer_replay_policy",
"consumer_rate_limit_bps", "consumer_sample_freq", "consumer_max_waiting", "consumer_max_ack_pending",
"consumer_headers_only", "consumer_max_batch", "consumer_max_expires", "consumer_max_bytes",
"consumer_inactive_threshold", "consumer_num_replicas", "consumer_mem_storage", "consumer_filter_subjects",
"consumer_metadata", "consumer_pause_until", "consumer_priority_policy", "consumer_priority_timeout",
"consumer_priority_groups",
}
var StreamURIParameters = []string{"stream_name", "stream_description", "stream_subjects", "stream_retention", "stream_max_consumers",
"stream_max_msgs", "stream_max_bytes", "stream_discard", "stream_discard_new_per_subject", "stream_max_age",
"stream_max_msgs_per_subject", "stream_max_msg_size", "stream_storage", "stream_num_replicas", "stream_no_ack",
"stream_duplicate_window", "stream_placement", "stream_mirror", "stream_sources", "stream_sealed",
"stream_deny_delete", "stream_deny_purge", "stream_allow_rollup_hdrs", "stream_compression", "stream_first_seq",
"stream_subject_transform", "stream_republish", "stream_allow_direct", "stream_mirror_direct", "stream_consumer_limits",
"stream_metadata", "stream_template_owner", "stream_allow_msg_ttl", "stream_subject_delete_marker_ttl",
}
Functions ¶
func OpenSubscription ¶
func OpenSubscription(ctx context.Context, connector connections.Connector, opts *connections.SubscriptionOptions) (*pubsub.Subscription, error)
OpenSubscription returns a *pubsub.Subscription representing a NATS subscription or NATS queue subscription for use with NATS at least version 2.2.0. This changes the encoding of the message as, starting with version 2.2.0, NATS supports message headers. In previous versions the message headers were encoded along with the message content using gob.Encoder, which limits the subscribers only to Go clients. This implementation uses native NATS message headers, and native message content, which provides full support for non-Go clients.
Example ¶
// PRAGMA: This example is used on gocloud.dev; PRAGMA comments adjust how it is shown and can be ignored.
// PRAGMA: On gocloud.dev, add a blank import: _ "gocloud.dev/pubsub/natspubsub"
// PRAGMA: On gocloud.dev, hide lines until the next blank line.
ctx := context.Background()
natsConn, err := nats.Connect("nats://nats.example.com")
if err != nil {
log.Fatal(err)
}
defer natsConn.Close()
conn, err := connections.NewPlain(natsConn)
if err != nil {
log.Fatal(err)
}
subscription, err := natspubsub.OpenSubscription(
ctx, connections.WrapConnection(conn), &connections.SubscriptionOptions{Subject: "example.mysubject"})
if err != nil {
log.Fatal(err)
}
defer func(subscription *pubsub.Subscription, ctx context.Context) {
_ = subscription.Shutdown(ctx)
}(subscription, ctx)
func OpenTopic ¶
func OpenTopic(ctx context.Context, connector connections.Connector, opts *connections.TopicOptions) (*pubsub.Topic, error)
OpenTopic returns a *pubsub.Topic for use with NATS, version 2.2.0 and above is recommended. This changes the encoding of the message as, starting with version 2.2.0, NATS supports message headers. In previous versions the message headers were encoded along with the message content using gob.Encoder, which limits the subscribers only to Go clients. This implementation uses native NATS message headers, and native message content, which provides full support for non-Go clients.
Example ¶
// PRAGMA: This example is used on gocloud.dev; PRAGMA comments adjust how it is shown and can be ignored.
// PRAGMA: On gocloud.dev, add a blank import: _ "gocloud.dev/pubsub/natspubsub"
// PRAGMA: On gocloud.dev, hide lines until the next blank line.
ctx := context.Background()
natsConn, err := nats.Connect("nats://nats.example.com")
if err != nil {
log.Fatal(err)
}
defer natsConn.Close()
conn, err := connections.NewJetstream(natsConn)
if err != nil {
log.Fatal(err)
}
topic, err := natspubsub.OpenTopic(ctx, connections.WrapConnection(conn), &connections.TopicOptions{Subject: "example.mysubject"})
if err != nil {
log.Fatal(err)
}
defer func(topic *pubsub.Topic, ctx context.Context) {
_ = topic.Shutdown(ctx)
}(topic, ctx)
Types ¶
type URLOpener ¶
type URLOpener struct {
Conn connections.Connection
// TopicOptions specifies the options to pass to OpenTopic.
TopicOptions connections.TopicOptions
// SubscriptionOptions specifies the options to pass to OpenSubscription.
SubscriptionOptions connections.SubscriptionOptions
// contains filtered or unexported fields
}
URLOpener opens NATS URLs like "nats://mysubject".
The URL host+path is used as the subject.
No query parameters are supported.
func (*URLOpener) ConfirmClose ¶ added in v0.6.1
ConfirmClose decrements the reference count and returns the new count
func (*URLOpener) ConfirmOpen ¶ added in v0.6.1
ConfirmOpen increments the reference count of topics and subscriptions opened
func (*URLOpener) Connection ¶
func (o *URLOpener) Connection() connections.Connection
Connection increments the reference count of topics and subscriptions opened
func (*URLOpener) OpenSubscriptionURL ¶
func (*URLOpener) OpenTopicURL ¶
OpenTopicURL opens a pubsub.Topic based on a url supplied.
A topic can be specified in the subject and suffixed by the url path These definitions will yield the subject shown infront of them - nats://host:8934?subject=foo --> foo - nats://host:8934/bar?subject=foo --> foo/bar - nats://host:8934/bar --> /bar - nats://host:8934?no_subject=foo --> [this yields an error]
Directories
¶
| Path | Synopsis |
|---|---|
|
Package errors provides error handling for the natspubsub package.
|
Package errors provides error handling for the natspubsub package. |