pubsub

package
v0.0.0-...-0e76970 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 13, 2022 License: Apache-2.0, Apache-2.0 Imports: 14 Imported by: 0

Documentation

Overview

Package pubsub provides an easy way to publish and receive Google Cloud Pub/Sub messages, hiding the the details of the underlying server RPCs. Google Cloud Pub/Sub is a many-to-many, asynchronous messaging system that decouples senders and receivers.

Note: This package is experimental and may make backwards-incompatible changes.

More information about Google Cloud Pub/Sub is available at https://cloud.google.com/pubsub/docs

Publishing

Google Cloud Pub/Sub messages are published to topics. Topics may be created using the pubsub package like so:

topic, err := pubsubClient.NewTopic(context.Background(), "topic-name")

Messages may then be published to a topic:

 msgIDs, err := topic.Publish(ctx, &pubsub.Message{
	Data: []byte("payload"),
 })

Receiving

To receive messages published to a topic, clients create subscriptions to the topic. There may be more than one subscription per topic; each message that is published to the topic will be delivered to all of its subscriptions.

Subsciptions may be created like so:

sub, err := pubsubClient.NewSubscription(context.Background(), "sub-name", topic, 0, nil)

Messages are then consumed from a subscription via an iterator:

 // Construct the iterator
 it, err := sub.Pull(context.Background())
 if err != nil {
	// handle err ...
 }
 defer it.Stop()

 // Consume N messages
 for i := 0; i < N; i++ {
 	msg, err := it.Next()
 	if err == pubsub.Done {
 		break
 	}
 	if err != nil {
 		// handle err ...
 		break
 	}

 	log.Print("got message: ", string(msg.Data))
 	msg.Done(true)
 }

The message iterator returns messages one at a time, fetching batches of messages behind the scenes as needed. Once client code has processed the message, it must call Message.Done, otherwise the message will eventually be redelivered. For more information and configuration options, see "Deadlines" below.

Note: It is possible for Messages to be redelivered, even if Message.Done has been called. Client code must be robust to multiple deliveries of messages.

Deadlines

The default pubsub deadlines are suitable for most use cases, but may be overridden. This section describes the tradeoffs that should be considered when overriding the defaults.

Behind the scenes, each message returned by the Pub/Sub server has an associated lease, known as an "ACK deadline". Unless a message is acknowledged within the ACK deadline, or the client requests that the ACK deadline be extended, the message will become elegible for redelivery. As a convenience, the pubsub package will automatically extend deadlines until either:

  • Message.Done is called, or
  • the "MaxExtension" period elapses from the time the message is fetched from the server.

The initial ACK deadline given to each messages defaults to 10 seconds, but may be overridden during subscription creation. Selecting an ACK deadline is a tradeoff between message redelivery latency and RPC volume. If the pubsub package fails to acknowledge or extend a message (e.g. due to unexpected termination of the process), a shorter ACK deadline will generally result in faster message redelivery by the Pub/Sub system. However, a short ACK deadline may also increase the number of deadline extension RPCs that the pubsub package sends to the server.

The default max extension period is DefaultMaxExtension, and can be overridden by passing a MaxExtension option to Subscription.Pull. Selecting a max extension period is a tradeoff between the speed at which client code must process messages, and the redelivery delay if messages fail to be acknowledged (e.g. because client code neglects to do so). Using a large MaxExtension increases the available time for client code to process messages. However, if the client code neglects to call Message.Done, a large MaxExtension will increase the delay before the message is redelivered.

Index

Examples

Constants

View Source
const (
	// ScopePubSub grants permissions to view and manage Pub/Sub
	// topics and subscriptions.
	ScopePubSub = "https://www.googleapis.com/auth/pubsub"

	// ScopeCloudPlatform grants permissions to view and manage your data
	// across Google Cloud Platform services.
	ScopeCloudPlatform = "https://www.googleapis.com/auth/cloud-platform"
)
View Source
const DefaultMaxExtension = 10 * time.Minute

The default period for which to automatically extend Message acknowledgement deadlines.

View Source
const DefaultMaxPrefetch = 100

The default maximum number of messages that are prefetched from the server.

View Source
const MaxPublishBatchSize = 1000

Variables

Done is returned when an iteration is complete.

Functions

This section is empty.

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client is a Google Pub/Sub client scoped to a single project.

Clients should be reused rather than being created as needed. A Client may be shared by multiple goroutines.

func NewClient

func NewClient(ctx context.Context, projectID string, opts ...option.ClientOption) (*Client, error)

NewClient creates a new PubSub client.

Example
package main

import (
	"cloud.google.com/go/pubsub"
	"golang.org/x/net/context"
)

func main() {
	ctx := context.Background()
	_, err := pubsub.NewClient(ctx, "project-id")
	if err != nil {
		// TODO: Handle error.
	}

	// See the other examples to learn how to use the Client.
}
Output:

func (*Client) Close

func (c *Client) Close() error

Close closes any resources held by the client.

Close need not be called at program exit.

func (*Client) CreateSubscription

func (c *Client) CreateSubscription(ctx context.Context, id string, topic *Topic, ackDeadline time.Duration, pushConfig *PushConfig) (*Subscription, error)

CreateSubscription creates a new subscription on a topic.

name is the name of the subscription to create. It must start with a letter, and contain only letters ([A-Za-z]), numbers ([0-9]), dashes (-), underscores (_), periods (.), tildes (~), plus (+) or percent signs (%). It must be between 3 and 255 characters in length, and must not start with "goog".

topic is the topic from which the subscription should receive messages. It need not belong to the same project as the subscription.

ackDeadline is the maximum time after a subscriber receives a message before the subscriber should acknowledge the message. It must be between 10 and 600 seconds (inclusive), and is rounded down to the nearest second. If the provided ackDeadline is 0, then the default value of 10 seconds is used. Note: messages which are obtained via a MessageIterator need not be acknowledged within this deadline, as the deadline will be automatically extended.

pushConfig may be set to configure this subscription for push delivery.

If the subscription already exists an error will be returned.

Example
package main

import (
	"time"

	"cloud.google.com/go/pubsub"
	"golang.org/x/net/context"
)

func main() {
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, "project-id")
	if err != nil {
		// TODO: Handle error.
	}

	// Create a new topic with the given name.
	topic, err := client.CreateTopic(ctx, "topicName")
	if err != nil {
		// TODO: Handle error.
	}

	// Create a new subscription to the previously created topic
	// with the given name.
	sub, err := client.CreateSubscription(ctx, "subName", topic, 10*time.Second, nil)
	if err != nil {
		// TODO: Handle error.
	}

	_ = sub // TODO: use the subscription.
}
Output:

func (*Client) CreateTopic

func (c *Client) CreateTopic(ctx context.Context, id string) (*Topic, error)

CreateTopic creates a new topic. The specified topic ID must start with a letter, and contain only letters ([A-Za-z]), numbers ([0-9]), dashes (-), underscores (_), periods (.), tildes (~), plus (+) or percent signs (%). It must be between 3 and 255 characters in length, and must not start with "goog". If the topic already exists an error will be returned.

Example
package main

import (
	"cloud.google.com/go/pubsub"
	"golang.org/x/net/context"
)

func main() {
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, "project-id")
	if err != nil {
		// TODO: Handle error.
	}

	// Create a new topic with the given name.
	topic, err := client.CreateTopic(ctx, "topicName")
	if err != nil {
		// TODO: Handle error.
	}

	_ = topic // TODO: use the topic.
}
Output:

func (*Client) Subscription

func (c *Client) Subscription(id string) *Subscription

Subscription creates a reference to a subscription.

func (*Client) Subscriptions

func (c *Client) Subscriptions(ctx context.Context) *SubscriptionIterator

Subscriptions returns an iterator which returns all of the subscriptions for the client's project.

Example
package main

import (
	"cloud.google.com/go/pubsub"
	"golang.org/x/net/context"
)

func main() {
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, "project-id")
	if err != nil {
		// TODO: Handle error.
	}
	// List all subscriptions of the project.
	it := client.Subscriptions(ctx)
	_ = it // TODO: iterate using Next.
}
Output:

func (*Client) Topic

func (c *Client) Topic(id string) *Topic

Topic creates a reference to a topic.

func (*Client) Topics

func (c *Client) Topics(ctx context.Context) *TopicIterator

Topics returns an iterator which returns all of the topics for the client's project.

Example
package main

import (
	"cloud.google.com/go/pubsub"
	"golang.org/x/net/context"
)

func main() {
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, "project-id")
	if err != nil {
		// TODO: Handle error.
	}
	it := client.Topics(ctx)
	_ = it // TODO: iterate using Next.
}
Output:

type Message

type Message struct {
	// ID identifies this message.
	// This ID is assigned by the server and is populated for Messages obtained from a subscription.
	// This field is read-only.
	ID string

	// Data is the actual data in the message.
	Data []byte

	// Attributes represents the key-value pairs the current message
	// is labelled with.
	Attributes map[string]string

	// The time at which the message was published.
	// This is populated by the server for Messages obtained from a subscription.
	// This field is read-only.
	PublishTime time.Time
	// contains filtered or unexported fields
}

Message represents a Pub/Sub message.

func (*Message) Done

func (m *Message) Done(ack bool)

Done completes the processing of a Message that was returned from a MessageIterator. ack indicates whether the message should be acknowledged. Client code must call Done when finished for each Message returned by an iterator. Done may only be called on Messages returned by a MessageIterator. If message acknowledgement fails, the Message will be redelivered. Calls to Done have no effect after the first call.

See MessageIterator.Next for an example.

type MessageIterator

type MessageIterator struct {
	// contains filtered or unexported fields
}

func (*MessageIterator) Next

func (it *MessageIterator) Next() (*Message, error)

Next returns the next Message to be processed. The caller must call Message.Done when finished with it. Once Stop has been called, calls to Next will return Done.

Example
package main

import (
	"fmt"

	"cloud.google.com/go/pubsub"
	"golang.org/x/net/context"
	"google.golang.org/api/iterator"
)

func main() {
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, "project-id")
	if err != nil {
		// TODO: Handle error.
	}
	it, err := client.Subscription("subName").Pull(ctx)
	if err != nil {
		// TODO: Handle error.
	}
	// Ensure that the iterator is closed down cleanly.
	defer it.Stop()
	// Consume 10 messages.
	for i := 0; i < 10; i++ {
		m, err := it.Next()
		if err == iterator.Done {
			// There are no more messages.  This will happen if it.Stop is called.
			break
		}
		if err != nil {
			// TODO: Handle error.
			break
		}
		fmt.Printf("message %d: %s\n", i, m.Data)

		// Acknowledge the message.
		m.Done(true)
	}
}
Output:

func (*MessageIterator) Stop

func (it *MessageIterator) Stop()

Client code must call Stop on a MessageIterator when finished with it. Stop will block until Done has been called on all Messages that have been returned by Next, or until the context with which the MessageIterator was created is cancelled or exceeds its deadline. Stop need only be called once, but may be called multiple times from multiple goroutines.

Example (Defer)
package main

import (
	"cloud.google.com/go/pubsub"
	"golang.org/x/net/context"
)

func main() {
	// If all uses of the iterator occur within the lifetime of a single
	// function, stop it with defer.
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, "project-id")
	if err != nil {
		// TODO: Handle error.
	}
	it, err := client.Subscription("subName").Pull(ctx)
	if err != nil {
		// TODO: Handle error.
	}

	// Ensure that the iterator is closed down cleanly.
	defer it.Stop()

	// TODO: Use the iterator (see the example for MessageIterator.Next).
}
Output:

Example (Goroutine)
package main

import (
	"time"

	"cloud.google.com/go/pubsub"
	"golang.org/x/net/context"
)

func main() *pubsub.MessageIterator {
	// If you use the iterator outside the lifetime of a single function, you
	// must still stop it.
	// This (contrived) example returns an iterator that will yield messages
	// for ten seconds, and then stop.
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, "project-id")
	if err != nil {
		// TODO: Handle error.
	}
	it, err := client.Subscription("subName").Pull(ctx)
	if err != nil {
		// TODO: Handle error.
	}
	// Stop the iterator after receiving messages for ten seconds.
	go func() {
		time.Sleep(10 * time.Second)
		it.Stop()
	}()
	return it
}
Output:

type PullOption

type PullOption interface {
	// contains filtered or unexported methods
}

A PullOption is an optional argument to Subscription.Pull.

func MaxExtension

func MaxExtension(duration time.Duration) PullOption

MaxExtension returns a PullOption that limits how long acks deadlines are extended for.

A MessageIterator will automatically extend the ack deadline of all fetched Messages for the duration specified. Automatic deadline extension may be disabled by specifying a duration of 0.

func MaxPrefetch

func MaxPrefetch(num int) PullOption

MaxPrefetch returns a PullOption that limits Message prefetching.

For performance reasons, the pubsub library may prefetch a pool of Messages to be returned serially from MessageIterator.Next. MaxPrefetch is used to limit the the size of this pool.

If num is less than 1, it will be treated as if it were 1.

type PushConfig

type PushConfig struct {
	// A URL locating the endpoint to which messages should be pushed.
	Endpoint string

	// Endpoint configuration attributes. See https://cloud.google.com/pubsub/reference/rest/v1/projects.subscriptions#PushConfig.FIELDS.attributes for more details.
	Attributes map[string]string
}

PushConfig contains configuration for subscriptions that operate in push mode.

type Subscription

type Subscription struct {
	// contains filtered or unexported fields
}

Subscription is a reference to a PubSub subscription.

func (*Subscription) Config

Config fetches the current configuration for the subscription.

Example
package main

import (
	"fmt"

	"cloud.google.com/go/pubsub"
	"golang.org/x/net/context"
)

func main() {
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, "project-id")
	if err != nil {
		// TODO: Handle error.
	}
	sub := client.Subscription("subName")
	config, err := sub.Config(ctx)
	if err != nil {
		// TODO: Handle error.
	}
	fmt.Println(config)
}
Output:

func (*Subscription) Delete

func (s *Subscription) Delete(ctx context.Context) error

Delete deletes the subscription.

Example
package main

import (
	"cloud.google.com/go/pubsub"
	"golang.org/x/net/context"
)

func main() {
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, "project-id")
	if err != nil {
		// TODO: Handle error.
	}

	sub := client.Subscription("subName")
	if err := sub.Delete(ctx); err != nil {
		// TODO: Handle error.
	}
}
Output:

func (*Subscription) Exists

func (s *Subscription) Exists(ctx context.Context) (bool, error)

Exists reports whether the subscription exists on the server.

Example
package main

import (
	"cloud.google.com/go/pubsub"
	"golang.org/x/net/context"
)

func main() {
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, "project-id")
	if err != nil {
		// TODO: Handle error.
	}

	sub := client.Subscription("subName")
	ok, err := sub.Exists(ctx)
	if err != nil {
		// TODO: Handle error.
	}
	if !ok {
		// Subscription doesn't exist.
	}
}
Output:

func (*Subscription) ID

func (s *Subscription) ID() string

ID returns the unique identifier of the subscription within its project.

func (*Subscription) ModifyPushConfig

func (s *Subscription) ModifyPushConfig(ctx context.Context, conf *PushConfig) error

ModifyPushConfig updates the endpoint URL and other attributes of a push subscription.

Example
package main

import (
	"cloud.google.com/go/pubsub"
	"golang.org/x/net/context"
)

func main() {
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, "project-id")
	if err != nil {
		// TODO: Handle error.
	}
	sub := client.Subscription("subName")
	if err := sub.ModifyPushConfig(ctx, &pubsub.PushConfig{Endpoint: "https://example.com/push"}); err != nil {
		// TODO: Handle error.
	}
}
Output:

func (*Subscription) Pull

func (s *Subscription) Pull(ctx context.Context, opts ...PullOption) (*MessageIterator, error)

Pull returns a MessageIterator that can be used to fetch Messages. The MessageIterator will automatically extend the ack deadline of all fetched Messages, for the period specified by DefaultMaxExtension. This may be overridden by supplying a MaxExtension pull option.

If ctx is cancelled or exceeds its deadline, outstanding acks or deadline extensions will fail.

The caller must call Stop on the MessageIterator once finished with it.

Example
package main

import (
	"cloud.google.com/go/pubsub"
	"golang.org/x/net/context"
)

func main() {
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, "project-id")
	if err != nil {
		// TODO: Handle error.
	}
	it, err := client.Subscription("subName").Pull(ctx)
	if err != nil {
		// TODO: Handle error.
	}
	// Ensure that the iterator is closed down cleanly.
	defer it.Stop()
}
Output:

Example (Options)
package main

import (
	"time"

	"cloud.google.com/go/pubsub"
	"golang.org/x/net/context"
)

func main() {
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, "project-id")
	if err != nil {
		// TODO: Handle error.
	}
	sub := client.Subscription("subName")
	// This program is expected to process and acknowledge messages
	// in 5 seconds. If not, Pub/Sub API will assume the message is not
	// acknowledged.
	it, err := sub.Pull(ctx, pubsub.MaxExtension(5*time.Second))
	if err != nil {
		// TODO: Handle error.
	}
	// Ensure that the iterator is closed down cleanly.
	defer it.Stop()
}
Output:

func (*Subscription) String

func (s *Subscription) String() string

String returns the globally unique printable name of the subscription.

type SubscriptionConfig

type SubscriptionConfig struct {
	Topic      *Topic
	PushConfig PushConfig

	// The default maximum time after a subscriber receives a message before
	// the subscriber should acknowledge the message. Note: messages which are
	// obtained via a MessageIterator need not be acknowledged within this
	// deadline, as the deadline will be automatically extended.
	AckDeadline time.Duration
}

Subscription config contains the configuration of a subscription.

type SubscriptionIterator

type SubscriptionIterator struct {
	// contains filtered or unexported fields
}

SubscriptionIterator is an iterator that returns a series of subscriptions.

func (*SubscriptionIterator) Next

func (subs *SubscriptionIterator) Next() (*Subscription, error)

Next returns the next subscription. If there are no more subscriptions, Done will be returned.

Example
package main

import (
	"fmt"

	"cloud.google.com/go/pubsub"
	"golang.org/x/net/context"
	"google.golang.org/api/iterator"
)

func main() {
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, "project-id")
	if err != nil {
		// TODO: Handle error.
	}
	// List all subscriptions of the project.
	it := client.Subscriptions(ctx)
	for {
		sub, err := it.Next()
		if err == iterator.Done {
			break
		}
		if err != nil {
			// TODO: Handle error.
		}
		fmt.Println(sub)
	}
}
Output:

type Topic

type Topic struct {
	// contains filtered or unexported fields
}

Topic is a reference to a PubSub topic.

func (*Topic) Delete

func (t *Topic) Delete(ctx context.Context) error

Delete deletes the topic.

Example
package main

import (
	"cloud.google.com/go/pubsub"
	"golang.org/x/net/context"
)

func main() {
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, "project-id")
	if err != nil {
		// TODO: Handle error.
	}

	topic := client.Topic("topicName")
	if err := topic.Delete(ctx); err != nil {
		// TODO: Handle error.
	}
}
Output:

func (*Topic) Exists

func (t *Topic) Exists(ctx context.Context) (bool, error)

Exists reports whether the topic exists on the server.

Example
package main

import (
	"cloud.google.com/go/pubsub"
	"golang.org/x/net/context"
)

func main() {
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, "project-id")
	if err != nil {
		// TODO: Handle error.
	}

	topic := client.Topic("topicName")
	ok, err := topic.Exists(ctx)
	if err != nil {
		// TODO: Handle error.
	}
	if !ok {
		// Topic doesn't exist.
	}
}
Output:

func (*Topic) ID

func (t *Topic) ID() string

ID returns the unique idenfier of the topic within its project.

func (*Topic) Publish

func (t *Topic) Publish(ctx context.Context, msgs ...*Message) ([]string, error)

Publish publishes the supplied Messages to the topic. If successful, the server-assigned message IDs are returned in the same order as the supplied Messages. At most MaxPublishBatchSize messages may be supplied.

Example
package main

import (
	"fmt"

	"cloud.google.com/go/pubsub"
	"golang.org/x/net/context"
)

func main() {
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, "project-id")
	if err != nil {
		// TODO: Handle error.
	}

	topic := client.Topic("topicName")
	msgIDs, err := topic.Publish(ctx, &pubsub.Message{
		Data: []byte("hello world"),
	})
	if err != nil {
		// TODO: Handle error.
	}
	fmt.Printf("Published a message with a message ID: %s\n", msgIDs[0])
}
Output:

func (*Topic) String

func (t *Topic) String() string

String returns the printable globally unique name for the topic.

func (*Topic) Subscriptions

func (t *Topic) Subscriptions(ctx context.Context) *SubscriptionIterator

Subscriptions returns an iterator which returns the subscriptions for this topic.

Example
package main

import (
	"cloud.google.com/go/pubsub"
	"golang.org/x/net/context"
)

func main() {
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, "project-id")
	if err != nil {
		// TODO: Handle error.
	}
	topic := client.Topic("topic-name")
	// List all subscriptions of the topic (maybe of multiple projects).
	for subs := topic.Subscriptions(ctx); ; {
		sub, err := subs.Next()
		if err == pubsub.Done {
			break
		}
		if err != nil {
			// TODO: Handle error.
		}
		_ = sub // TODO: use the subscription.
	}
}
Output:

type TopicIterator

type TopicIterator struct {
	// contains filtered or unexported fields
}

TopicIterator is an iterator that returns a series of topics.

func (*TopicIterator) Next

func (tps *TopicIterator) Next() (*Topic, error)

Next returns the next topic. If there are no more topics, Done will be returned.

Example
package main

import (
	"fmt"

	"cloud.google.com/go/pubsub"
	"golang.org/x/net/context"
	"google.golang.org/api/iterator"
)

func main() {
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, "project-id")
	if err != nil {
		// TODO: Handle error.
	}
	// List all topics.
	it := client.Topics(ctx)
	for {
		t, err := it.Next()
		if err == iterator.Done {
			break
		}
		if err != nil {
			// TODO: Handle error.
		}
		fmt.Println(t)
	}
}
Output:

Directories

Path Synopsis
Package pubsub is an experimental, auto-generated package for the pubsub API.
Package pubsub is an experimental, auto-generated package for the pubsub API.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL