Documentation

Overview

Package pubsub provides an easy and portable way to interact with publish/subscribe systems. Subpackages contain driver implementations of pubsub for supported services

See https://gocloud.dev/howto/pubsub/ for a detailed how-to guide.

At-most-once and At-least-once Delivery

The semantics of message delivery vary across PubSub services. Some services guarantee that messages received by subscribers but not acknowledged are delivered again (at-least-once semantics). In others, a message will be delivered only once, if it is delivered at all (at-most-once semantics). Some services support both modes via options.

This package accommodates both kinds of systems, but application developers should think carefully about which kind of semantics the application needs. Even though the application code may look similar, system-level characteristics are quite different. See the driver package documentation for more information about message delivery semantics.

After receiving a Message via Subscription.Receive:

- Always call Message.Ack or Message.Nack after processing the message.
- For some drivers, Ack will be a no-op.
- For some drivers, Nack is not supported and will panic; you can call
  Message.Nackable to see.

OpenCensus Integration

OpenCensus supports tracing and metric collection for multiple languages and backend providers. See https://opencensus.io.

This API collects OpenCensus traces and metrics for the following methods:

- Topic.Send
- Topic.Shutdown
- Subscription.Receive
- Subscription.Shutdown
- The internal driver methods SendBatch, SendAcks and ReceiveBatch.

All trace and metric names begin with the package import path. The traces add the method name. For example, "gocloud.dev/pubsub/Topic.Send". The metrics are "completed_calls", a count of completed method calls by driver, method and status (error code); and "latency", a distribution of method latency by driver and method. For example, "gocloud.dev/pubsub/latency".

To enable trace collection in your application, see "Configure Exporter" at https://opencensus.io/quickstart/go/tracing. To enable metric collection in your application, see "Exporting stats" at https://opencensus.io/quickstart/go/metrics.

Index

Examples

Constants

This section is empty.

Variables

View Source
var NewSubscription = newSubscription

    NewSubscription is for use by drivers only. Do not use in application code.

    View Source
    var NewTopic = newTopic

      NewTopic is for use by drivers only. Do not use in application code.

      View Source
      var (
      
      	// OpenCensusViews are predefined views for OpenCensus metrics.
      	// The views include counts and latency distributions for API method calls.
      	// See the example at https://godoc.org/go.opencensus.io/stats/view for usage.
      	OpenCensusViews = oc.Views(pkgName, latencyMeasure)
      )

      Functions

      This section is empty.

      Types

      type Message

      type Message struct {
      	// Body contains the content of the message.
      	Body []byte
      
      	// Metadata has key/value metadata for the message.
      	//
      	// When sending a message, set any key/value pairs you want associated with
      	// the message. It is acceptable for Metadata to be nil.
      	// Note that some services limit the number of key/value pairs per message.
      	//
      	// When receiving a message, Metadata will be nil if the message has no
      	// associated metadata.
      	Metadata map[string]string
      
      	// BeforeSend is a callback used when sending a message. It will always be
      	// set to nil for received messages.
      	//
      	// The callback will be called exactly once, before the message is sent.
      	//
      	// asFunc converts its argument to driver-specific types.
      	// See https://gocloud.dev/concepts/as/ for background information.
      	BeforeSend func(asFunc func(interface{}) bool) error
      	// contains filtered or unexported fields
      }

        Message contains data to be published.

        func (*Message) Ack

        func (m *Message) Ack()

          Ack acknowledges the message, telling the server that it does not need to be sent again to the associated Subscription. It will be a no-op for some drivers; see https://godoc.org/gocloud.dev/pubsub#hdr-At_most_once_and_At_least_once_Delivery for more info.

          Ack returns immediately, but the actual ack is sent in the background, and is not guaranteed to succeed. If background acks persistently fail, the error will be returned from a subsequent Receive.

          func (*Message) As

          func (m *Message) As(i interface{}) bool

            As converts i to driver-specific types. See https://gocloud.dev/concepts/as/ for background information, the "As" examples in this package for examples, and the driver package documentation for the specific types supported for that driver. As panics unless it is called on a message obtained from Subscription.Receive.

            Example
            Output:
            
            

            func (*Message) Nack

            func (m *Message) Nack()

              Nack (short for negative acknowledgment) tells the server that this Message was not processed and should be redelivered.

              Nack panics for some drivers, as Nack is meaningless when messages can't be redelivered. You can call Nackable to determine if Nack is available. See https://godoc.org/gocloud.dev/pubsub#hdr-At_most_once_and_At_least_once_Delivery fore more info.

              Nack returns immediately, but the actual nack is sent in the background, and is not guaranteed to succeed.

              Nack is a performance optimization for retrying transient failures. It must not be used for message parse errors or other messages that the application will never be able to process: calling Nack will cause them to be redelivered and overload the server. Instead, an application should call Ack and log the failure in some monitored way.

              func (*Message) Nackable

              func (m *Message) Nackable() bool

                Nackable returns true iff Nack can be called without panicking.

                Some services do not support Nack; for example, at-most-once services can't redeliver a message. See https://godoc.org/gocloud.dev/pubsub#hdr-At_most_once_and_At_least_once_Delivery for more info.

                type Subscription

                type Subscription struct {
                	// contains filtered or unexported fields
                }

                  Subscription receives published messages.

                  func OpenSubscription

                  func OpenSubscription(ctx context.Context, urlstr string) (*Subscription, error)

                    OpenSubscription opens the Subscription identified by the URL given. See the URLOpener documentation in driver subpackages for details on supported URL formats, and https://gocloud.dev/concepts/urls for more information.

                    func (*Subscription) As

                    func (s *Subscription) As(i interface{}) bool

                      As converts i to driver-specific types. See https://gocloud.dev/concepts/as/ for background information, the "As" examples in this package for examples, and the driver package documentation for the specific types supported for that driver.

                      Example
                      Output:
                      
                      

                      func (*Subscription) ErrorAs

                      func (s *Subscription) ErrorAs(err error, i interface{}) bool

                        ErrorAs converts err to driver-specific types. ErrorAs panics if i is nil or not a pointer. ErrorAs returns false if err == nil. See Topic.As for more details.

                        Example
                        Output:
                        
                        

                        func (*Subscription) Receive

                        func (s *Subscription) Receive(ctx context.Context) (_ *Message, err error)

                          Receive receives and returns the next message from the Subscription's queue, blocking and polling if none are available. It can be called concurrently from multiple goroutines.

                          Receive retries retryable errors from the underlying driver forever. Therefore, if Receive returns an error, either: 1. It is a non-retryable error from the underlying driver, either from

                          an attempt to fetch more messages or from an attempt to ack messages.
                          Operator intervention may be required (e.g., invalid resource, quota
                          error, etc.). Receive will return the same error from then on, so the
                          application should log the error and either recreate the Subscription,
                          or exit.
                          

                          2. The provided ctx is Done. Error() on the returned error will include both

                          the ctx error and the underlying driver error, and ErrorAs on it
                          can access the underlying driver error type if needed. Receive may
                          be called again with a fresh ctx.
                          

                          Callers can distinguish between the two by checking if the ctx they passed is Done, or via xerrors.Is(err, context.DeadlineExceeded or context.Canceled) on the returned error.

                          The Ack method of the returned Message must be called once the message has been processed, to prevent it from being received again.

                          Example
                          Output:
                          
                          
                          Example (Concurrent)
                          Output:
                          
                          

                          func (*Subscription) Shutdown

                          func (s *Subscription) Shutdown(ctx context.Context) (err error)

                            Shutdown flushes pending ack sends and disconnects the Subscription.

                            type SubscriptionURLOpener

                            type SubscriptionURLOpener interface {
                            	OpenSubscriptionURL(ctx context.Context, u *url.URL) (*Subscription, error)
                            }

                              SubscriptionURLOpener represents types than can open Subscriptions based on a URL. The opener must not modify the URL argument. OpenSubscriptionURL must be safe to call from multiple goroutines.

                              This interface is generally implemented by types in driver packages.

                              type Topic

                              type Topic struct {
                              	// contains filtered or unexported fields
                              }

                                Topic publishes messages to all its subscribers.

                                func OpenTopic

                                func OpenTopic(ctx context.Context, urlstr string) (*Topic, error)

                                  OpenTopic opens the Topic identified by the URL given. See the URLOpener documentation in driver subpackages for details on supported URL formats, and https://gocloud.dev/concepts/urls for more information.

                                  func (*Topic) As

                                  func (t *Topic) As(i interface{}) bool

                                    As converts i to driver-specific types. See https://gocloud.dev/concepts/as/ for background information, the "As" examples in this package for examples, and the driver package documentation for the specific types supported for that driver.

                                    Example
                                    Output:
                                    
                                    

                                    func (*Topic) ErrorAs

                                    func (t *Topic) ErrorAs(err error, i interface{}) bool

                                      ErrorAs converts err to driver-specific types. ErrorAs panics if i is nil or not a pointer. ErrorAs returns false if err == nil. See https://gocloud.dev/concepts/as/ for background information.

                                      Example
                                      Output:
                                      
                                      

                                      func (*Topic) Send

                                      func (t *Topic) Send(ctx context.Context, m *Message) (err error)

                                        Send publishes a message. It only returns after the message has been sent, or failed to be sent. Send can be called from multiple goroutines at once.

                                        Example
                                        Output:
                                        
                                        

                                        func (*Topic) Shutdown

                                        func (t *Topic) Shutdown(ctx context.Context) (err error)

                                          Shutdown flushes pending message sends and disconnects the Topic. It only returns after all pending messages have been sent.

                                          type TopicURLOpener

                                          type TopicURLOpener interface {
                                          	OpenTopicURL(ctx context.Context, u *url.URL) (*Topic, error)
                                          }

                                            TopicURLOpener represents types than can open Topics based on a URL. The opener must not modify the URL argument. OpenTopicURL must be safe to call from multiple goroutines.

                                            This interface is generally implemented by types in driver packages.

                                            type URLMux

                                            type URLMux struct {
                                            	// contains filtered or unexported fields
                                            }

                                              URLMux is a URL opener multiplexer. It matches the scheme of the URLs against a set of registered schemes and calls the opener that matches the URL's scheme. See https://gocloud.dev/concepts/urls/ for more information.

                                              The zero value is a multiplexer with no registered schemes.

                                              func DefaultURLMux

                                              func DefaultURLMux() *URLMux

                                                DefaultURLMux returns the URLMux used by OpenTopic and OpenSubscription.

                                                Driver packages can use this to register their TopicURLOpener and/or SubscriptionURLOpener on the mux.

                                                func (*URLMux) OpenSubscription

                                                func (mux *URLMux) OpenSubscription(ctx context.Context, urlstr string) (*Subscription, error)

                                                  OpenSubscription calls OpenSubscriptionURL with the URL parsed from urlstr. OpenSubscription is safe to call from multiple goroutines.

                                                  func (*URLMux) OpenSubscriptionURL

                                                  func (mux *URLMux) OpenSubscriptionURL(ctx context.Context, u *url.URL) (*Subscription, error)

                                                    OpenSubscriptionURL dispatches the URL to the opener that is registered with the URL's scheme. OpenSubscriptionURL is safe to call from multiple goroutines.

                                                    func (*URLMux) OpenTopic

                                                    func (mux *URLMux) OpenTopic(ctx context.Context, urlstr string) (*Topic, error)

                                                      OpenTopic calls OpenTopicURL with the URL parsed from urlstr. OpenTopic is safe to call from multiple goroutines.

                                                      func (*URLMux) OpenTopicURL

                                                      func (mux *URLMux) OpenTopicURL(ctx context.Context, u *url.URL) (*Topic, error)

                                                        OpenTopicURL dispatches the URL to the opener that is registered with the URL's scheme. OpenTopicURL is safe to call from multiple goroutines.

                                                        func (*URLMux) RegisterSubscription

                                                        func (mux *URLMux) RegisterSubscription(scheme string, opener SubscriptionURLOpener)

                                                          RegisterSubscription registers the opener with the given scheme. If an opener already exists for the scheme, RegisterSubscription panics.

                                                          func (*URLMux) RegisterTopic

                                                          func (mux *URLMux) RegisterTopic(scheme string, opener TopicURLOpener)

                                                            RegisterTopic registers the opener with the given scheme. If an opener already exists for the scheme, RegisterTopic panics.

                                                            func (*URLMux) SubscriptionSchemes

                                                            func (mux *URLMux) SubscriptionSchemes() []string

                                                              SubscriptionSchemes returns a sorted slice of the registered Subscription schemes.

                                                              func (*URLMux) TopicSchemes

                                                              func (mux *URLMux) TopicSchemes() []string

                                                                TopicSchemes returns a sorted slice of the registered Topic schemes.

                                                                func (*URLMux) ValidSubscriptionScheme

                                                                func (mux *URLMux) ValidSubscriptionScheme(scheme string) bool

                                                                  ValidSubscriptionScheme returns true iff scheme has been registered for Subscriptions.

                                                                  func (*URLMux) ValidTopicScheme

                                                                  func (mux *URLMux) ValidTopicScheme(scheme string) bool

                                                                    ValidTopicScheme returns true iff scheme has been registered for Topics.

                                                                    Source Files

                                                                    Directories

                                                                    Path Synopsis
                                                                    Package awssnssqs provides two implementations of pubsub.Topic, one that sends messages to AWS SNS (Simple Notification Service), and one that sends messages to SQS (Simple Queuing Service).
                                                                    Package awssnssqs provides two implementations of pubsub.Topic, one that sends messages to AWS SNS (Simple Notification Service), and one that sends messages to SQS (Simple Queuing Service).
                                                                    Package azuresb provides an implementation of pubsub using Azure Service Bus Topic and Subscription.
                                                                    Package azuresb provides an implementation of pubsub using Azure Service Bus Topic and Subscription.
                                                                    Package batcher supports batching of items.
                                                                    Package batcher supports batching of items.
                                                                    Package driver defines interfaces to be implemented by pubsub drivers, which will be used by the pubsub package to interact with the underlying services.
                                                                    Package driver defines interfaces to be implemented by pubsub drivers, which will be used by the pubsub package to interact with the underlying services.
                                                                    Package drivertest provides a conformance test for implementations of driver.
                                                                    Package drivertest provides a conformance test for implementations of driver.
                                                                    Package gcppubsub provides a pubsub implementation that uses GCP PubSub.
                                                                    Package gcppubsub provides a pubsub implementation that uses GCP PubSub.
                                                                    kafkapubsub module
                                                                    Package mempubsub provides an in-memory pubsub implementation.
                                                                    Package mempubsub provides an in-memory pubsub implementation.
                                                                    natspubsub module
                                                                    rabbitpubsub module