Version: v0.0.0-...-b0358da Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: Feb 17, 2018 License: MIT Imports: 12 Imported by: 0



This is a Go library that provides a thin, but opinionated abstraction over the go-nats API. The use case is for writing services that use NATS as a transport layer.

The NATS API expects a slice of bytes as the representation of a message. In general, it is often necessary to standardize on a serialization format to simplify designed and interacting with messages.

This library standardizes on Protocol Buffers as the message serialization format and it provides a few conveniences when working with Protobuf messages.

The two main value-add features this library provides are the Transport interface and implementation and the Message type.

The Transport interface describes a set of API methods that take Protobuf messages rather than byte slices. Being an interface, it enables implementing wrapper middleware to the API itself for the purpose of instrumentation. For example:

// Declare a transport and initialize the core implementation using an
// existing NATS connection.
var tp transport.Transport
tp = transport.New(nc)

// Define a struct that satisfies the Transport interface and takes another
// Transport value to wrap.
type timerTransport struct {
  tp transport.Transport

func (t *timerTransport) Request(sub string, req proto.Message, rep proto.Message) (*transport.Message, error) {
  t0 := time.Now()
  msg, err :=, req, rep)
  return msg, err

// Implement remaining methods..

// Wrap the base transport.
tp = &timerTransport{tp}

// Using this wrapped transport will now automatically log the duration of the call.

In the Request method shown above, a transport.Message value is returned. Message is a Protobuf message which wraps all messages sent through the API. It annotates the message with additional metadata such as:

  • id - a unique message ID.
  • timestamp - timestamp in nanoseconds.
  • cause - the causal upstream message.
  • subject - the subject of the message.
  • reply - the reply subject of a request message.
  • queue - the queue that handled the message.
  • error - a handling error if one occurred.

This provides additional metadata on the message which can be useful for logging or instrumentation.


To initialize a new transport client, use either transport.Connect with NATS options or transport.New with an existing *nats.Conn value.

tp, err := transport.Connect(&nats.Options{
  Url: "nats://localhost:4222",
defer tp.Close()

Deferring the close ensures all subscriptions are stopped and the connection is closed.

There are two ways to publish messages. The first is Publish, the standard "fire and forget" broadcast to all subscribers of the message subject.

val := pb.Value{ ... }
msg, err := tp.Publish("query.sink", &val)

The payload must implement proto.Message, meaning that is must be a Protobuf message.

The second way to publish messages is Request, which waits for a reply so it can return response data to the client.

// Request message to send.
req := pb.Request{ ... }

// The protobuf message to decode the reply into.
var rep pb.Reply

msg, err := tp.Request("query.execute", &req, &rep)

Here's how to subscribe to a subject using Subscribe.

// Define the handler.
hdlr := func(msg *transport.Message) (proto.Message, error) {
  // Decode message payload into local request value.
  var req pb.Request
  if err := msg.Decode(&req); err != nil {
    return nil, err
  // Do things..
  return &pb.Reply{ ... }, nil

// Subscribe the handler.
_, err := c.Subscribe("query.execute", hdlr)



Package transport is a generated protocol buffer package.

It is generated from these files:


It has these top-level messages:




This section is empty.


View Source
var (
	DefaultRequestTimeout = 2 * time.Second


This section is empty.


type Handler

type Handler func(msg *Message) (proto.Message, error)

Handler is the handler used by a subscriber. The return value may be nil if no output is yielded. If this is a request, the reply will be sent automatically with the reply value or an error if one occurred. If a reply is not expected and error occurs, it will be logged. The error can be inspected using status.FromError.

type Message

type Message struct {
	// ID is a globally unique message.
	Id string `protobuf:"bytes,1,opt,name=id" json:"id,omitempty"`
	// Timestamp is the timestamp in nanoseconds with the message was published.
	Timestamp uint64 `protobuf:"varint,2,opt,name=timestamp" json:"timestamp,omitempty"`
	// Payload is the actual payload being published that will be consumed.
	// This is expected to use protobuf encoding.
	Payload []byte `protobuf:"bytes,3,opt,name=payload,proto3" json:"payload,omitempty"`
	// Deprecated. Use status instead.
	Error string `protobuf:"bytes,4,opt,name=error" json:"error,omitempty"`
	// Cause is the ID of a message that resulted in this message being published.
	Cause string `protobuf:"bytes,5,opt,name=cause" json:"cause,omitempty"`
	// Subject is the subject the message is published.
	Subject string `protobuf:"bytes,6,opt,name=subject" json:"subject,omitempty"`
	// Queue is the queue this message was received by.
	Queue string `protobuf:"bytes,7,opt,name=queue" json:"queue,omitempty"`
	// Reply is the inbox ID of the publisher of this message.
	Reply string `protobuf:"bytes,8,opt,name=reply" json:"reply,omitempty"`
	// Error that occurs in during transport or in the application.
	Status *google_rpc.Status `protobuf:"bytes,9,opt,name=status" json:"status,omitempty"`

Message is the envelope/wrapper for all messages.

func (*Message) Decode

func (m *Message) Decode(pb proto.Message) error

Decode decodes the message payload into a proto message.

func (*Message) Descriptor

func (*Message) Descriptor() ([]byte, []int)

func (*Message) GetCause

func (m *Message) GetCause() string

func (*Message) GetError

func (m *Message) GetError() string

func (*Message) GetId

func (m *Message) GetId() string

func (*Message) GetPayload

func (m *Message) GetPayload() []byte

func (*Message) GetQueue

func (m *Message) GetQueue() string

func (*Message) GetReply

func (m *Message) GetReply() string

func (*Message) GetStatus

func (m *Message) GetStatus() *google_rpc.Status

func (*Message) GetSubject

func (m *Message) GetSubject() string

func (*Message) GetTimestamp

func (m *Message) GetTimestamp() uint64

func (*Message) ProtoMessage

func (*Message) ProtoMessage()

func (*Message) Reset

func (m *Message) Reset()

func (*Message) String

func (m *Message) String() string

type PublishOption

type PublishOption func(*PublishOptions)

func PublishCause

func PublishCause(s string) PublishOption

PublishCause sets the cause of the publication.

type PublishOptions

type PublishOptions struct {
	Cause string

PublishOptions are options for a publication.

type RequestOption

type RequestOption func(*RequestOptions)

func RequestCause

func RequestCause(s string) RequestOption

RequestCause sets the cause of the request.

func RequestTimeout

func RequestTimeout(t time.Duration) RequestOption

RequestTimeout sets a request timeout duration.

type RequestOptions

type RequestOptions struct {
	Cause   string
	Timeout time.Duration

RequestOptions are options for a publication.

type SubscribeOption

type SubscribeOption func(*SubscribeOptions)

func SubscribeQueue

func SubscribeQueue(q string) SubscribeOption

SubscribeQueue specifies the queue name of the subscriber.

type SubscribeOptions

type SubscribeOptions struct {
	Queue string

SubscribeOptions are options for a subscriber.

type Transport

type Transport interface {
	// Publish publishes a message asynchronously to the specified subject.
	// The wrapped message is returned or an error. The error would only be due to
	// a connection issue, but does not reflect any consumer error.
	Publish(sub string, msg proto.Message, opts ...PublishOption) (*Message, error)

	// Request publishes a message synchronously and waits for a response that
	// is decoded into the Protobuf message supplied. The wrapped message is
	// returned or an error. The error can inspected using status.FromError.
	Request(sub string, req proto.Message, rep proto.Message, opts ...RequestOption) (*Message, error)

	// Subscribe creates a subscription to a subject.
	Subscribe(sub string, hdl Handler, opts ...SubscribeOption) (*nats.Subscription, error)

	// Conn returns the underlying NATS connection.
	Conn() *nats.Conn

	// Close closes the transport connection and unsubscribes all subscribers.

	// Set the logger.

Transport describes the interface

func Connect

func Connect(opts *nats.Options) (Transport, error)

Connect is a convenience function establishing a connection with NATS and returning a transport.

func New

func New(conn *nats.Conn) Transport

New returns a transport using an existing NATS connection.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
t or T : Toggle theme light dark auto
y or Y : Canonical URL