pubsub

package
v0.0.0-...-02050b9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 25, 2016 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Overview

Package pubsub provides an easy way to publish and receive Google Cloud Pub/Sub messages, hiding the the details of the underlying server RPCs. Google Cloud Pub/Sub is a many-to-many, asynchronous messaging system that decouples senders and receivers.

Note: This package is experimental and may make backwards-incompatible changes.

More information about Google Cloud Pub/Sub is available at https://cloud.google.com/pubsub/docs

Publishing

Google Cloud Pub/Sub messages are published to topics. Topics may be created using the pubsub package like so:

topic, err := client.NewTopic(context.Background(), "topic-name")

Messages may then be published to a topic:

 msgIDs, err := topic.Publish(ctx, &pubsub.Message{
	Data: []byte("payload"),
 })

Receiving

To receive messages published to a topic, clients create subscriptions to the topic. There may be more than one subscription per topic; each message that is published to the topic will be delivered to all of its subscriptions.

Subsciptions may be created like so:

sub, err := client.NewSubscription(context.Background(), "sub-name", topic, 0, nil)

Messages are then consumed from a subscription via an iterator:

 // Construct the iterator
 it, err := sub.Pull(context.Background())
 if err != nil {
	// handle err ...
 }
 defer it.Stop()

 // Consume N messages
 for i := 0; i < N; i++ {
 	msg, err := it.Next()
 	if err == pubsub.Done {
 		break
 	}
 	if err != nil {
 		// handle err ...
 		break
 	}

 	log.Print("got message: ", string(msg.Data))
 	msg.Done(true)
 }

The message iterator returns messages one at a time, fetching batches of messages behind the scenes as needed. Once client code has processed the message, it must call Message.Done, otherwise the message will eventually be redelivered. For more information and configuration options, see "Deadlines" below.

Note: It is possible for Messages to be redelivered, even if Message.Done has been called. Client code must be robust to multiple deliveries of messages.

Deadlines

The default pubsub deadlines are suitable for most use cases, but may be overridden. This section describes the tradeoffs that should be considered when overriding the defaults.

Behind the scenes, each message returned by the Pub/Sub server has an associated lease, known as an "ACK deadline". Unless a message is acknowledged within the ACK deadline, or the client requests that the ACK deadline be extended, the message will become elegible for redelivery. As a convenience, the pubsub package will automatically extend deadlines until either:

  • Message.Done is called, or
  • the "MaxExtension" period elapses from the time the message is fetched from the server.

The initial ACK deadline given to each messages defaults to 10 seconds, but may be overridden during subscription creation. Selecting an ACK deadline is a tradeoff between message redelivery latency and RPC volume. If the pubsub package fails to acknowledge or extend a message (e.g. due to unexpected termination of the process), a shorter ACK deadline will generally result in faster message redelivery by the Pub/Sub system. However, a short ACK deadline may also increase the number of deadline extension RPCs that the pubsub package sends to the server.

The default max extension period is DefaultMaxExtension, and can be overridden by passing a MaxExtension option to Subscription.Pull. Selecting a max extension period is a tradeoff between the speed at which client code must process messages, and the redelivery delay if messages fail to be acknowledged (e.g. because client code neglects to do so). Using a large MaxExtension increases the available time for client code to process messages. However, if the client code neglects to call Message.Done, a large MaxExtension will increase the delay before the message is redelivered.

Example (Auth)
package main

import (
	"io/ioutil"
	"log"

	"golang.org/x/oauth2"
	"golang.org/x/oauth2/google"
	"google.golang.org/cloud"
	"google.golang.org/cloud/pubsub"
)

func main() context.Context {
	// Initialize an authorized context with Google Developers Console
	// JSON key. Read the google package examples to learn more about
	// different authorization flows you can use.
	// http://godoc.org/golang.org/x/oauth2/google
	jsonKey, err := ioutil.ReadFile("/path/to/json/keyfile.json")
	if err != nil {
		log.Fatal(err)
	}
	conf, err := google.JWTConfigFromJSON(
		jsonKey,
		pubsub.ScopeCloudPlatform,
		pubsub.ScopePubSub,
	)
	if err != nil {
		log.Fatal(err)
	}
	ctx := cloud.NewContext("project-id", conf.Client(oauth2.NoContext))
	// See the other samples to learn how to use the context.
	return ctx
}
Output:

Index

Examples

Constants

View Source
const (
	// ScopePubSub grants permissions to view and manage Pub/Sub
	// topics and subscriptions.
	ScopePubSub = "https://www.googleapis.com/auth/pubsub"

	// ScopeCloudPlatform grants permissions to view and manage your data
	// across Google Cloud Platform services.
	ScopeCloudPlatform = "https://www.googleapis.com/auth/cloud-platform"
)
View Source
const DefaultMaxExtension = 10 * time.Minute

The default period for which to automatically extend Message acknowledgement deadlines.

View Source
const DefaultMaxPrefetch = 100

The default maximum number of messages that are prefetched from the server.

View Source
const MaxPublishBatchSize = 1000

Variables

View Source
var Done = errors.New("no more messages")

Done is returned when an iteration is complete.

Functions

func Ack deprecated

func Ack(ctx context.Context, sub string, id ...string) error

Ack acknowledges one or more Pub/Sub messages on the specified subscription.

Deprecated: Call Message.Done on a Message returned by Iterator.Next instead.

func CreateSub deprecated

func CreateSub(ctx context.Context, name string, topic string, deadline time.Duration, endpoint string) error

CreateSub creates a Pub/Sub subscription on the backend.

Deprecated: Use Client.NewSubscription instead.

A subscription should subscribe to an existing topic.

The messages that haven't acknowledged will be pushed back to the subscription again when the default acknowledgement deadline is reached. You can override the default deadline by providing a non-zero deadline. Deadline must not be specified to precision greater than one second.

As new messages are being queued on the subscription, you may recieve push notifications regarding to the new arrivals. To receive notifications of new messages in the queue, specify an endpoint callback URL. If endpoint is an empty string the backend will not notify the client of new messages.

If the subscription already exists an error will be returned.

func CreateTopic deprecated

func CreateTopic(ctx context.Context, name string) error

CreateTopic creates a new topic with the specified name on the backend.

Deprecated: Use Client.NewTopic instead.

It will return an error if topic already exists.

func DeleteSub deprecated

func DeleteSub(ctx context.Context, name string) error

DeleteSub deletes the subscription.

Deprecated: Use Subscription.Delete instead.

func DeleteTopic deprecated

func DeleteTopic(ctx context.Context, name string) error

DeleteTopic deletes the specified topic.

Deprecated: Use Topic.Delete instead.

func ModifyAckDeadline deprecated

func ModifyAckDeadline(ctx context.Context, sub string, id string, deadline time.Duration) error

ModifyAckDeadline modifies the acknowledgement deadline for the messages retrieved from the specified subscription. Deadline must not be specified to precision greater than one second.

Deprecated: Use Subscription.Pull instead, which automatically extends ack deadlines.

func ModifyPushEndpoint deprecated

func ModifyPushEndpoint(ctx context.Context, sub, endpoint string) error

ModifyPushEndpoint modifies the URL endpoint to modify the resource to handle push notifications coming from the Pub/Sub backend for the specified subscription.

Deprecated: Use Subscription.ModifyPushConfig instead.

func Publish deprecated

func Publish(ctx context.Context, topic string, msgs ...*Message) ([]string, error)

Publish publishes messages to the topic's subscribers. It returns message IDs upon success.

Deprecated: Use Topic.Publish instead.

Example
package main

import (
	"io/ioutil"
	"log"

	"golang.org/x/net/context"
	"golang.org/x/oauth2"
	"golang.org/x/oauth2/google"
	"google.golang.org/cloud"
	"google.golang.org/cloud/pubsub"
)

func Example_auth() context.Context {

	jsonKey, err := ioutil.ReadFile("/path/to/json/keyfile.json")
	if err != nil {
		log.Fatal(err)
	}
	conf, err := google.JWTConfigFromJSON(
		jsonKey,
		pubsub.ScopeCloudPlatform,
		pubsub.ScopePubSub,
	)
	if err != nil {
		log.Fatal(err)
	}
	ctx := cloud.NewContext("project-id", conf.Client(oauth2.NoContext))

	return ctx
}

func main() {
	ctx := Example_auth()

	msgIDs, err := pubsub.Publish(ctx, "topic1", &pubsub.Message{
		Data: []byte("hello world"),
	})
	if err != nil {
		log.Fatal(err)
	}
	log.Printf("Published a message with a message id: %s\n", msgIDs[0])
}
Output:

func SubExists deprecated

func SubExists(ctx context.Context, name string) (bool, error)

SubExists returns true if subscription exists.

Deprecated: Use Subscription.Exists instead.

func TopicExists deprecated

func TopicExists(ctx context.Context, name string) (bool, error)

TopicExists returns true if a topic exists with the specified name.

Deprecated: Use Topic.Exists instead.

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client is a Google Pub/Sub client, which may be used to perform Pub/Sub operations with a project. It must be constructed via NewClient.

func NewClient

func NewClient(ctx context.Context, projectID string, opts ...cloud.ClientOption) (*Client, error)

NewClient creates a new PubSub client.

func (*Client) NewSubscription

func (c *Client) NewSubscription(ctx context.Context, name string, topic *Topic, ackDeadline time.Duration, pushConfig *PushConfig) (*Subscription, error)

NewSubscription creates a new subscription to a topic.

name is the name of the subscription to create. It must start with a letter, and contain only letters ([A-Za-z]), numbers ([0-9]), dashes (-), underscores (_), periods (.), tildes (~), plus (+) or percent signs (%). It must be between 3 and 255 characters in length, and must not start with "goog".

topic is the topic from which the subscription should receive messages. It need not belong to the same project as the subscription.

ackDeadline is the maximum time after a subscriber receives a message before the subscriber should acknowledge the message. It must be between 10 and 600 seconds (inclusive), and is rounded down to the nearest second. If the provided ackDeadline is 0, then the default value of 10 seconds is used. Note: messages which are obtained via an Iterator need not be acknowledged within this deadline, as the deadline will be automatically extended.

pushConfig may be set to configure this subscription for push delivery.

If the subscription already exists an error will be returned.

func (*Client) NewTopic

func (c *Client) NewTopic(ctx context.Context, name string) (*Topic, error)

NewTopic creates a new topic. The specified topic name must start with a letter, and contain only letters ([A-Za-z]), numbers ([0-9]), dashes (-), underscores (_), periods (.), tildes (~), plus (+) or percent signs (%). It must be between 3 and 255 characters in length, and must not start with "goog". If the topic already exists an error will be returned.

func (*Client) Subscription

func (c *Client) Subscription(name string) *Subscription

Subscription creates a reference to a subscription.

func (*Client) Subscriptions

func (c *Client) Subscriptions(ctx context.Context) *SubscriptionIterator

Subscriptions returns an iterator which returns all of the subscriptions for the client's project.

func (*Client) Topic

func (c *Client) Topic(name string) *Topic

Topic creates a reference to a topic.

func (*Client) Topics

func (c *Client) Topics(ctx context.Context) *TopicIterator

Topics returns an iterator which returns all of the topics for the client's project.

type Iterator

type Iterator struct {
	// contains filtered or unexported fields
}

func (*Iterator) Next

func (it *Iterator) Next() (*Message, error)

Next returns the next Message to be processed. The caller must call Message.Done when finished with it. Once Stop has been called, calls to Next will return Done.

func (*Iterator) Stop

func (it *Iterator) Stop()

Client code must call Stop on an Iterator when finished with it. Stop will block until Done has been called on all Messages that have been returned by Next, or until the context with which the Iterator was created is cancelled or exceeds its deadline. Stop need only be called once, but may be called multiple times from multiple goroutines.

type Message

type Message struct {
	// ID identifies this message.
	// This ID is assigned by the server and is populated for Messages obtained from a subscription.
	// It is otherwise ignored.
	ID string

	// Data is the actual data in the message.
	Data []byte

	// Attributes represents the key-value pairs the current message
	// is labelled with.
	Attributes map[string]string

	// AckID is the identifier to acknowledge this message.
	AckID string
	// contains filtered or unexported fields
}

Message represents a Pub/Sub message.

func Pull deprecated

func Pull(ctx context.Context, sub string, n int) ([]*Message, error)

Pull pulls up to n messages from the subscription. n must not be larger than 100.

Deprecated: Use Subscription.Pull instead

Example
package main

import (
	"io/ioutil"
	"log"

	"golang.org/x/net/context"
	"golang.org/x/oauth2"
	"golang.org/x/oauth2/google"
	"google.golang.org/cloud"
	"google.golang.org/cloud/pubsub"
)

func Example_auth() context.Context {

	jsonKey, err := ioutil.ReadFile("/path/to/json/keyfile.json")
	if err != nil {
		log.Fatal(err)
	}
	conf, err := google.JWTConfigFromJSON(
		jsonKey,
		pubsub.ScopeCloudPlatform,
		pubsub.ScopePubSub,
	)
	if err != nil {
		log.Fatal(err)
	}
	ctx := cloud.NewContext("project-id", conf.Client(oauth2.NoContext))

	return ctx
}

func main() {
	ctx := Example_auth()

	// E.g. c.CreateSub("sub1", "topic1", time.Duration(0), "")
	msgs, err := pubsub.Pull(ctx, "sub1", 1)
	if err != nil {
		log.Fatal(err)
	}
	log.Printf("New message arrived: %v\n", msgs[0])
	if err := pubsub.Ack(ctx, "sub1", msgs[0].AckID); err != nil {
		log.Fatal(err)
	}
	log.Println("Acknowledged message")
}
Output:

func PullWait deprecated

func PullWait(ctx context.Context, sub string, n int) ([]*Message, error)

PullWait pulls up to n messages from the subscription. If there are no messages in the queue, it will wait until at least one message is available or a timeout occurs. n must not be larger than 100.

Deprecated: Use Subscription.Pull instead

func (*Message) Done

func (m *Message) Done(ack bool)

Done completes the processing of a Message that was returned from an Iterator. ack indicates whether the message should be acknowledged. Client code must call Done when finished for each Message returned by an iterator. Done may only be called on Messages returned by an iterator. If message acknowledgement fails, the Message will be redelivered. Calls to Done have no effect after the first call.

type PullOption

type PullOption interface {
	// contains filtered or unexported methods
}

A PullOption is an optional argument to Subscription.Pull.

func MaxExtension

func MaxExtension(duration time.Duration) PullOption

MaxExtension returns a PullOption that limits how long acks deadlines are extended for.

An Iterator will automatically extend the ack deadline of all fetched Messages for the duration specified. Automatic deadline extension may be disabled by specifying a duration of 0.

func MaxPrefetch

func MaxPrefetch(num int) PullOption

MaxPrefetch returns a PullOption that limits Message prefetching.

For performance reasons, the pubsub library may prefetch a pool of Messages to be returned serially from Iterator.Next. MaxPrefetch is used to limit the the size of this pool.

If num is less than 1, it will be treated as if it were 1.

type PushConfig

type PushConfig struct {
	// A URL locating the endpoint to which messages should be pushed.
	Endpoint string

	// Endpoint configuration attributes. See https://cloud.google.com/pubsub/reference/rest/v1/projects.subscriptions#PushConfig.FIELDS.attributes for more details.
	Attributes map[string]string
}

PushConfig contains configuration for subscriptions that operate in push mode.

type Subscription

type Subscription struct {
	// contains filtered or unexported fields
}

Subscription is a reference to a PubSub subscription.

func (*Subscription) Config

Config fetches the current configuration for the subscription.

func (*Subscription) Delete

func (s *Subscription) Delete(ctx context.Context) error

Delete deletes the subscription.

func (*Subscription) Exists

func (s *Subscription) Exists(ctx context.Context) (bool, error)

Exists reports whether the subscription exists on the server.

func (*Subscription) ModifyPushConfig

func (s *Subscription) ModifyPushConfig(ctx context.Context, conf *PushConfig) error

ModifyPushConfig updates the endpoint URL and other attributes of a push subscription.

func (*Subscription) Name

func (s *Subscription) Name() string

Name returns the globally unique name for the subscription.

func (*Subscription) Pull

func (s *Subscription) Pull(ctx context.Context, opts ...PullOption) (*Iterator, error)

Pull returns an Iterator that can be used to fetch Messages. The Iterator will automatically extend the ack deadline of all fetched Messages, for the period specified by DefaultMaxExtension. This may be overridden by supplying a MaxExtension pull option.

If ctx is cancelled or exceeds its deadline, outstanding acks or deadline extensions will fail.

The caller must call Stop on the Iterator once finished with it.

type SubscriptionConfig

type SubscriptionConfig struct {
	Topic      *Topic
	PushConfig PushConfig

	// The default maximum time after a subscriber receives a message
	// before the subscriber should acknowledge the message.  Note:
	// messages which are obtained via an Iterator need not be acknowledged
	// within this deadline, as the deadline will be automatically
	// extended.
	AckDeadline time.Duration
}

Subscription config contains the configuration of a subscription.

type SubscriptionIterator

type SubscriptionIterator struct {
	// contains filtered or unexported fields
}

SubscriptionIterator is an iterator that returns a series of subscriptions.

func (*SubscriptionIterator) Next

func (subs *SubscriptionIterator) Next() (*Subscription, error)

Next returns the next subscription. If there are no more subscriptions, Done will be returned.

type Topic

type Topic struct {
	// contains filtered or unexported fields
}

Topic is a reference to a PubSub topic.

func (*Topic) Delete

func (t *Topic) Delete(ctx context.Context) error

Delete deletes the topic.

func (*Topic) Exists

func (t *Topic) Exists(ctx context.Context) (bool, error)

Exists reports whether the topic exists on the server.

func (*Topic) Name

func (t *Topic) Name() string

Name returns the globally unique name for the topic.

func (*Topic) Publish

func (t *Topic) Publish(ctx context.Context, msgs ...*Message) ([]string, error)

Publish publishes the supplied Messages to the topic. If successful, the server-assigned message IDs are returned in the same order as the supplied Messages. At most MaxPublishBatchSize messages may be supplied.

func (*Topic) Subscriptions

func (t *Topic) Subscriptions(ctx context.Context) *SubscriptionIterator

Subscriptions returns an iterator which returns the subscriptions for this topic.

type TopicIterator

type TopicIterator struct {
	// contains filtered or unexported fields
}

TopicIterator is an iterator that returns a series of topics.

func (*TopicIterator) Next

func (tps *TopicIterator) Next() (*Topic, error)

Next returns the next topic. If there are no more topics, Done will be returned.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL