pubsub

package
v0.12.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 20, 2019 License: Apache-2.0 Imports: 16 Imported by: 247

Documentation

Overview

Package pubsub provides an easy and portable way to interact with publish/ subscribe systems.

Subpackages contain distinct implementations of pubsub for various providers, including Cloud and on-prem solutions. For example, "gcppubsub" supports Google Cloud Pub/Sub. Your application should import one of these provider-specific subpackages and use its exported functions to get a *Topic and/or *Subscription; do not use the NewTopic/NewSubscription functions in this package. For example:

topic := mempubsub.NewTopic()
err := topic.Send(ctx.Background(), &pubsub.Message{Body: []byte("hi"))
...

Then, write your application code using the *Topic/*Subscription types. You can easily reconfigure your initialization code to choose a different provider. You can develop your application locally using memblob, or deploy it to multiple Cloud providers. You may find http://github.com/google/wire useful for managing your initialization code.

Alternatively, you can construct a *Topic/*Subscription via a URL and OpenTopic/OpenSubscription. See https://godoc.org/gocloud.dev#hdr-URLs for more information.

At-most-once and At-least-once Delivery

Some PubSub systems guarantee that messages received by subscribers but not acknowledged are delivered again. These at-least-once systems require that subscribers call an ack function to indicate that they have fully processed a message.

In other PubSub systems, a message will be delivered only once, if it is delivered at all. These at-most-once systems do not need an Ack method.

This package accommodates both kinds of systems. If your application uses at-least-once providers, it should always call Message.Ack. If your application only uses at-most-once providers, it may call Message.Ack, but does not need to. The constructor for at-most-once-providers will require you to supply a function to be called whenever the application calls Message.Ack. Common implementations are: do nothing, on the grounds that you may want to test your at-least-once system with an at-most-once provider; or panic, so that a system that assumes at-least-once delivery isn't accidentally paired with an at-most-once provider. Providers that support both at-most-once and at-least-once semantics will include an optional ack function in their Options struct.

OpenCensus Integration

OpenCensus supports tracing and metric collection for multiple languages and backend providers. See https://opencensus.io.

This API collects OpenCensus traces and metrics for the following methods:

  • Topic.Send
  • Topic.Shutdown
  • Subscription.Receive
  • Subscription.Shutdown
  • The internal driver methods SendBatch, SendAcks and ReceiveBatch.

All trace and metric names begin with the package import path. The traces add the method name. For example, "gocloud.dev/pubsub/Topic.Send". The metrics are "completed_calls", a count of completed method calls by provider, method and status (error code); and "latency", a distribution of method latency by provider and method. For example, "gocloud.dev/pubsub/latency".

To enable trace collection in your application, see "Configure Exporter" at https://opencensus.io/quickstart/go/tracing. To enable metric collection in your application, see "Exporting stats" at https://opencensus.io/quickstart/go/metrics.

Example (ReceiveWithInvertedWorkerPool)
package main

import (
	"context"
	"fmt"
	"log"
	"sync"
	"time"

	"gocloud.dev/pubsub"
	"gocloud.dev/pubsub/mempubsub"
)

func main() {
	// Open a topic and corresponding subscription.
	ctx := context.Background()
	t := mempubsub.NewTopic()
	defer t.Shutdown(ctx)
	s := mempubsub.NewSubscription(t, time.Second)
	defer s.Shutdown(ctx)

	// Send a bunch of messages to the topic.
	const nMessages = 100
	for n := 0; n < nMessages; n++ {
		m := &pubsub.Message{
			Body: []byte(fmt.Sprintf("message %d", n)),
		}
		if err := t.Send(ctx, m); err != nil {
			log.Fatal(err)
		}
	}

	// In order to make our test exit, we keep track of how many messages were
	// processed with wg, and cancel the receiveCtx when we've processed them all.
	// A more realistic application would not need this WaitGroup.
	var wg sync.WaitGroup
	wg.Add(nMessages)
	receiveCtx, cancel := context.WithCancel(ctx)
	go func() {
		wg.Wait()
		cancel()
	}()

	// Process messages using an inverted worker pool, as described here:
	// https://www.youtube.com/watch?v=5zXAHh5tJqQ&t=26m58s
	// It uses a buffered channel, sem, as a semaphore to manage the maximum
	// number of workers.
	const poolSize = 10
	sem := make(chan struct{}, poolSize)
	for {
		// Read a message. Receive will block until a message is available.
		msg, err := s.Receive(receiveCtx)
		if err != nil {
			// An error from Receive is fatal; Receive will never succeed again
			// so the application should exit.
			// In this example, we expect to get a error here when we've read all the
			// messages and receiveCtx is canceled.
			break
		}

		// Write a token to the semaphore; if there are already poolSize workers
		// active, this will block until one of them completes.
		sem <- struct{}{}
		// Process the message. For many applications, this can be expensive, so
		// we do it in a goroutine, allowing this loop to continue and Receive more
		// messages.
		go func() {
			// Record that we've processed this message, and Ack it.
			msg.Ack()
			wg.Done()
			// Read a token from the semaphore before exiting this goroutine, freeing
			// up the slot for another goroutine.
			<-sem
		}()
	}

	// Wait for all workers to finish.
	for n := poolSize; n > 0; n-- {
		sem <- struct{}{}
	}
	fmt.Printf("Read %d messages", nMessages)

}
Output:

Read 100 messages
Example (ReceiveWithTraditionalWorkerPool)
package main

import (
	"context"
	"fmt"
	"log"
	"sync"
	"time"

	"gocloud.dev/pubsub"
	"gocloud.dev/pubsub/mempubsub"
)

func main() {
	// Open a topic and corresponding subscription.
	ctx := context.Background()
	t := mempubsub.NewTopic()
	defer t.Shutdown(ctx)
	s := mempubsub.NewSubscription(t, time.Second)
	defer s.Shutdown(ctx)

	// Send a bunch of messages to the topic.
	const nMessages = 100
	for n := 0; n < nMessages; n++ {
		m := &pubsub.Message{
			Body: []byte(fmt.Sprintf("message %d", n)),
		}
		if err := t.Send(ctx, m); err != nil {
			log.Fatal(err)
		}
	}

	// In order to make our test exit, we keep track of how many messages were
	// processed with wg, and cancel the receiveCtx when we've processed them all.
	// A more realistic application would not need this WaitGroup.
	var wg sync.WaitGroup
	wg.Add(nMessages)
	receiveCtx, cancel := context.WithCancel(ctx)
	go func() {
		wg.Wait()
		cancel()
	}()

	// Process messages using a traditional worker pool. Consider using an
	// inverted pool instead (see the other example).
	const poolSize = 10
	var workerWg sync.WaitGroup
	for n := 0; n < poolSize; n++ {
		workerWg.Add(1)
		go func() {
			for {
				// Read a message. Receive will block until a message is available.
				// It's fine to call Receive from many goroutines.
				msg, err := s.Receive(receiveCtx)
				if err != nil {
					// An error from Receive is fatal; Receive will never succeed again
					// so the application should exit.
					// In this example, we expect to get a error here when we've read all
					// the messages and receiveCtx is canceled.
					workerWg.Done()
					return
				}

				// Process the message and Ack it.
				msg.Ack()
				wg.Done()
			}
		}()
	}

	// Wait for all workers to finish.
	workerWg.Wait()
	fmt.Printf("Read %d messages", nMessages)

}
Output:

Read 100 messages
Example (SendReceive)
package main

import (
	"context"
	"fmt"
	"log"
	"time"

	"gocloud.dev/pubsub"
	"gocloud.dev/pubsub/mempubsub"
)

func main() {
	// Open a topic and corresponding subscription.
	ctx := context.Background()
	t := mempubsub.NewTopic()
	defer t.Shutdown(ctx)
	s := mempubsub.NewSubscription(t, time.Second)
	defer s.Shutdown(ctx)

	// Send a message to the topic.
	if err := t.Send(ctx, &pubsub.Message{Body: []byte("Hello, world!")}); err != nil {
		log.Fatal(err)
	}

	// Receive a message from the subscription.
	m, err := s.Receive(ctx)
	if err != nil {
		log.Fatal(err)
	}

	// Print out the received message.
	fmt.Printf("%s\n", m.Body)

	// Acknowledge the message.
	m.Ack()

}
Output:

Hello, world!
Example (SendReceiveMultipleMessages)
package main

import (
	"context"
	"fmt"
	"log"
	"sort"
	"time"

	"gocloud.dev/pubsub"
	"gocloud.dev/pubsub/mempubsub"
)

func main() {
	// Open a topic and corresponding subscription.
	ctx := context.Background()
	t := mempubsub.NewTopic()
	defer t.Shutdown(ctx)
	s := mempubsub.NewSubscription(t, time.Second)
	defer s.Shutdown(ctx)

	// Send messages to the topic.
	ms := []*pubsub.Message{
		{Body: []byte("a")},
		{Body: []byte("b")},
		{Body: []byte("c")},
	}
	for _, m := range ms {
		if err := t.Send(ctx, m); err != nil {
			log.Fatal(err)
		}
	}

	// Receive and acknowledge messages from the subscription.
	ms2 := []string{}
	for i := 0; i < len(ms); i++ {
		m2, err := s.Receive(ctx)
		if err != nil {
			log.Fatal(err)
		}
		ms2 = append(ms2, string(m2.Body))
		m2.Ack()
	}

	// The messages may be received in a different order than they were
	// sent.
	sort.Strings(ms2)

	// Print out the received messages.
	for _, m2 := range ms2 {
		fmt.Println(m2)
	}

}
Output:

a
b
c

Index

Examples

Constants

This section is empty.

Variables

View Source
var NewSubscription = newSubscription

NewSubscription is for use by provider implementations.

View Source
var NewTopic = newTopic

NewTopic is for use by provider implementations.

View Source
var (

	// OpenCensusViews are predefined views for OpenCensus metrics.
	// The views include counts and latency distributions for API method calls.
	// See the example at https://godoc.org/go.opencensus.io/stats/view for usage.
	OpenCensusViews = oc.Views(pkgName, latencyMeasure)
)

Functions

This section is empty.

Types

type Message

type Message struct {
	// Body contains the content of the message.
	Body []byte

	// Metadata has key/value metadata for the message.
	Metadata map[string]string
	// contains filtered or unexported fields
}

Message contains data to be published.

func (*Message) Ack

func (m *Message) Ack()

Ack acknowledges the message, telling the server that it does not need to be sent again to the associated Subscription. It returns immediately, but the actual ack is sent in the background, and is not guaranteed to succeed.

func (*Message) As added in v0.10.0

func (m *Message) As(i interface{}) bool

As converts i to provider-specific types. See https://godoc.org/gocloud.dev#hdr-As for background information, the "As" examples in this package for examples, and the provider-specific package documentation for the specific types supported for that provider. As panics unless it is called on a message obtained from Subscription.Receive.

type Subscription

type Subscription struct {
	// contains filtered or unexported fields
}

Subscription receives published messages.

func OpenSubscription added in v0.12.0

func OpenSubscription(ctx context.Context, urlstr string) (*Subscription, error)

OpenSubscription opens the Subscription identified by the URL given. See the URLOpener documentation in provider-specific subpackages for details on supported URL formats, and https://godoc.org/gocloud.dev#hdr-URLs for more information.

func (*Subscription) As

func (s *Subscription) As(i interface{}) bool

As converts i to provider-specific types. See https://godoc.org/gocloud.dev#hdr-As for background information, the "As" examples in this package for examples, and the provider-specific package documentation for the specific types supported for that provider.

func (*Subscription) ErrorAs added in v0.10.0

func (s *Subscription) ErrorAs(err error, i interface{}) bool

ErrorAs converts err to provider-specific types. ErrorAs panics if i is nil or not a pointer. ErrorAs returns false if err == nil. See Topic.As for more details.

func (*Subscription) Receive

func (s *Subscription) Receive(ctx context.Context) (_ *Message, err error)

Receive receives and returns the next message from the Subscription's queue, blocking and polling if none are available. This method can be called concurrently from multiple goroutines. The Ack() method of the returned Message has to be called once the message has been processed, to prevent it from being received again.

func (*Subscription) Shutdown

func (s *Subscription) Shutdown(ctx context.Context) (err error)

Shutdown flushes pending ack sends and disconnects the Subscription.

type SubscriptionURLOpener added in v0.12.0

type SubscriptionURLOpener interface {
	OpenSubscriptionURL(ctx context.Context, u *url.URL) (*Subscription, error)
}

SubscriptionURLOpener represents types than can open Subscriptions based on a URL. The opener must not modify the URL argument. OpenSubscriptionURL must be safe to call from multiple goroutines.

This interface is generally implemented by types in driver packages.

type Topic

type Topic struct {
	// contains filtered or unexported fields
}

Topic publishes messages to all its subscribers.

func OpenTopic added in v0.12.0

func OpenTopic(ctx context.Context, urlstr string) (*Topic, error)

OpenTopic opens the Topic identified by the URL given. See the URLOpener documentation in provider-specific subpackages for details on supported URL formats, and https://godoc.org/gocloud.dev#hdr-URLs for more information.

Example
package main

import (
	"context"
	"fmt"
	"log"

	"gocloud.dev/pubsub"
	_ "gocloud.dev/pubsub/mempubsub"
)

func main() {
	ctx := context.Background()

	// Create a Topic using a URL.
	// This example uses "mempubsub", the in-memory implementation.
	// We need to add a blank import line to register the mempubsub provider's
	// URLOpener, which implements pubsub.TopicURLOpener:
	// import _ "gocloud.dev/secrets/mempubsub"
	// mempubsub registers for the "mem" scheme.
	// All pubsub.OpenTopic URLs also work with "pubsub+" or "pubsub+topic+" prefixes,
	// e.g., "pubsub+mem://mytopic" or "pubsub+topic+mem://mytopic".
	topic, err := pubsub.OpenTopic(ctx, "mem://mytopic")
	if err != nil {
		log.Fatal(err)
	}
	// Similarly, we can open a subscription using a URL.
	// Prefixes "pubsub+" or "pubsub+subscription+" work as well.
	sub, err := pubsub.OpenSubscription(ctx, "mem://mytopic")
	if err != nil {
		log.Fatal(err)
	}

	// Now we can use topic to send messages.
	if err := topic.Send(ctx, &pubsub.Message{Body: []byte("Hello, world!")}); err != nil {
		log.Fatal(err)
	}

	// And receive it from sub.
	m, err := sub.Receive(ctx)
	if err != nil {
		log.Fatal(err)
	}
	fmt.Printf("%s\n", m.Body)
	m.Ack()

}
Output:

Hello, world!

func (*Topic) As

func (t *Topic) As(i interface{}) bool

As converts i to provider-specific types. See https://godoc.org/gocloud.dev#hdr-As for background information, the "As" examples in this package for examples, and the provider-specific package documentation for the specific types supported for that provider.

func (*Topic) ErrorAs added in v0.10.0

func (t *Topic) ErrorAs(err error, i interface{}) bool

ErrorAs converts err to provider-specific types. ErrorAs panics if i is nil or not a pointer. ErrorAs returns false if err == nil. See https://godoc.org/gocloud.dev#hdr-As for background information.

func (*Topic) Send

func (t *Topic) Send(ctx context.Context, m *Message) (err error)

Send publishes a message. It only returns after the message has been sent, or failed to be sent. Send can be called from multiple goroutines at once.

func (*Topic) Shutdown

func (t *Topic) Shutdown(ctx context.Context) (err error)

Shutdown flushes pending message sends and disconnects the Topic. It only returns after all pending messages have been sent.

type TopicURLOpener added in v0.12.0

type TopicURLOpener interface {
	OpenTopicURL(ctx context.Context, u *url.URL) (*Topic, error)
}

TopicURLOpener represents types than can open Topics based on a URL. The opener must not modify the URL argument. OpenTopicURL must be safe to call from multiple goroutines.

This interface is generally implemented by types in driver packages.

type URLMux added in v0.12.0

type URLMux struct {
	// contains filtered or unexported fields
}

URLMux is a URL opener multiplexer. It matches the scheme of the URLs against a set of registered schemes and calls the opener that matches the URL's scheme. See https://godoc.org/gocloud.dev#hdr-URLs for more information.

The zero value is a multiplexer with no registered schemes.

func DefaultURLMux added in v0.12.0

func DefaultURLMux() *URLMux

DefaultURLMux returns the URLMux used by OpenTopic and OpenSubscription.

Driver packages can use this to register their TopicURLOpener and/or SubscriptionURLOpener on the mux.

func (*URLMux) OpenSubscription added in v0.12.0

func (mux *URLMux) OpenSubscription(ctx context.Context, urlstr string) (*Subscription, error)

OpenSubscription calls OpenSubscriptionURL with the URL parsed from urlstr. OpenSubscription is safe to call from multiple goroutines.

func (*URLMux) OpenSubscriptionURL added in v0.12.0

func (mux *URLMux) OpenSubscriptionURL(ctx context.Context, u *url.URL) (*Subscription, error)

OpenSubscriptionURL dispatches the URL to the opener that is registered with the URL's scheme. OpenSubscriptionURL is safe to call from multiple goroutines.

func (*URLMux) OpenTopic added in v0.12.0

func (mux *URLMux) OpenTopic(ctx context.Context, urlstr string) (*Topic, error)

OpenTopic calls OpenTopicURL with the URL parsed from urlstr. OpenTopic is safe to call from multiple goroutines.

func (*URLMux) OpenTopicURL added in v0.12.0

func (mux *URLMux) OpenTopicURL(ctx context.Context, u *url.URL) (*Topic, error)

OpenTopicURL dispatches the URL to the opener that is registered with the URL's scheme. OpenTopicURL is safe to call from multiple goroutines.

func (*URLMux) RegisterSubscription added in v0.12.0

func (mux *URLMux) RegisterSubscription(scheme string, opener SubscriptionURLOpener)

RegisterSubscription registers the opener with the given scheme. If an opener already exists for the scheme, RegisterSubscription panics.

func (*URLMux) RegisterTopic added in v0.12.0

func (mux *URLMux) RegisterTopic(scheme string, opener TopicURLOpener)

RegisterTopic registers the opener with the given scheme. If an opener already exists for the scheme, RegisterTopic panics.

Directories

Path Synopsis
Package awssnssqs provides an implementation of pubsub that uses AWS SNS (Simple Notification Service) and SQS (Simple Queueing Service).
Package awssnssqs provides an implementation of pubsub that uses AWS SNS (Simple Notification Service) and SQS (Simple Queueing Service).
Package azuresb provides an implementation of pubsub using Azure Service Bus Topic and Subscription.
Package azuresb provides an implementation of pubsub using Azure Service Bus Topic and Subscription.
Package driver defines a set of interfaces that the pubsub package uses to interact with the underlying pubsub services.
Package driver defines a set of interfaces that the pubsub package uses to interact with the underlying pubsub services.
Package drivertest provides a conformance test for implementations of driver.
Package drivertest provides a conformance test for implementations of driver.
Package gcppubsub provides a pubsub implementation that uses GCP PubSub.
Package gcppubsub provides a pubsub implementation that uses GCP PubSub.
kafkapubsub module
Package mempubsub provides an in-memory pubsub implementation.
Package mempubsub provides an in-memory pubsub implementation.
Package natspubsub provides a pubsub implementation for NATS.io.
Package natspubsub provides a pubsub implementation for NATS.io.
Package rabbitpubsub provides an pubsub implementation for RabbitMQ.
Package rabbitpubsub provides an pubsub implementation for RabbitMQ.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL