Documentation ¶
Overview ¶
Package pubsub provides an easy way to publish and receive Google Cloud Pub/Sub messages, hiding the the details of the underlying server RPCs. Google Cloud Pub/Sub is a many-to-many, asynchronous messaging system that decouples senders and receivers.
More information about Google Cloud Pub/Sub is available at https://cloud.google.com/pubsub/docs
See https://godoc.org/cloud.google.com/go for authentication, timeouts, connection pooling and similar aspects of this package.
Publishing ¶
Google Cloud Pub/Sub messages are published to topics. Topics may be created using the pubsub package like so:
topic, err := pubsubClient.CreateTopic(context.Background(), "topic-name")
Messages may then be published to a topic:
res := topic.Publish(ctx, &pubsub.Message{Data: []byte("payload")})
Publish queues the message for publishing and returns immediately. When enough messages have accumulated, or enough time has elapsed, the batch of messages is sent to the Pub/Sub service.
Publish returns a PublishResult, which behaves like a future: its Get method blocks until the message has been sent to the service.
The first time you call Publish on a topic, goroutines are started in the background. To clean up these goroutines, call Stop:
topic.Stop()
Receiving ¶
To receive messages published to a topic, clients create subscriptions to the topic. There may be more than one subscription per topic; each message that is published to the topic will be delivered to all of its subscriptions.
Subsciptions may be created like so:
sub, err := pubsubClient.CreateSubscription(context.Background(), "sub-name", pubsub.SubscriptionConfig{Topic: topic})
Messages are then consumed from a subscription via callback.
err := sub.Receive(context.Background(), func(ctx context.Context, m *Message) { log.Printf("Got message: %s", m.Data) m.Ack() }) if err != nil { // Handle error. }
The callback is invoked concurrently by multiple goroutines, maximizing throughput. To terminate a call to Receive, cancel its context.
Once client code has processed the message, it must call Message.Ack, otherwise the message will eventually be redelivered. As an optimization, if the client cannot or doesn't want to process the message, it can call Message.Nack to speed redelivery. For more information and configuration options, see "Deadlines" below.
Note: It is possible for Messages to be redelivered, even if Message.Ack has been called. Client code must be robust to multiple deliveries of messages.
Note: This uses pubsub's streaming pull feature. This feature properties that may be surprising. Please take a look at https://cloud.google.com/pubsub/docs/pull#streamingpull for more details on how streaming pull behaves compared to the synchronous pull method.
Deadlines ¶
The default pubsub deadlines are suitable for most use cases, but may be overridden. This section describes the tradeoffs that should be considered when overriding the defaults.
Behind the scenes, each message returned by the Pub/Sub server has an associated lease, known as an "ACK deadline". Unless a message is acknowledged within the ACK deadline, or the client requests that the ACK deadline be extended, the message will become elegible for redelivery. As a convenience, the pubsub package will automatically extend deadlines until either:
- Message.Ack or Message.Nack is called, or
- the "MaxExtension" period elapses from the time the message is fetched from the server.
The initial ACK deadline given to each messages defaults to 10 seconds, but may be overridden during subscription creation. Selecting an ACK deadline is a tradeoff between message redelivery latency and RPC volume. If the pubsub package fails to acknowledge or extend a message (e.g. due to unexpected termination of the process), a shorter ACK deadline will generally result in faster message redelivery by the Pub/Sub system. However, a short ACK deadline may also increase the number of deadline extension RPCs that the pubsub package sends to the server.
The default max extension period is DefaultReceiveSettings.MaxExtension, and can be overridden by setting Subscription.ReceiveSettings.MaxExtension. Selecting a max extension period is a tradeoff between the speed at which client code must process messages, and the redelivery delay if messages fail to be acknowledged (e.g. because client code neglects to do so). Using a large MaxExtension increases the available time for client code to process messages. However, if the client code neglects to call Message.Ack/Nack, a large MaxExtension will increase the delay before the message is redelivered.
Slow Message Processing ¶
For use cases where message processing exceeds 30 minutes, we recommend using the base client in a pull model, since long-lived streams are periodically killed by firewalls. See the example at https://godoc.org/cloud.google.com/go/pubsub/apiv1#example-SubscriberClient-Pull-LengthyClientProcessing
Index ¶
- Constants
- Variables
- type Client
- func (c *Client) Close() error
- func (c *Client) CreateSubscription(ctx context.Context, id string, cfg SubscriptionConfig) (*Subscription, error)
- func (c *Client) CreateTopic(ctx context.Context, id string) (*Topic, error)
- func (c *Client) Snapshot(id string) *Snapshot
- func (c *Client) Snapshots(ctx context.Context) *SnapshotConfigIterator
- func (c *Client) Subscription(id string) *Subscription
- func (c *Client) SubscriptionInProject(id, projectID string) *Subscription
- func (c *Client) Subscriptions(ctx context.Context) *SubscriptionIterator
- func (c *Client) Topic(id string) *Topic
- func (c *Client) TopicInProject(id, projectID string) *Topic
- func (c *Client) Topics(ctx context.Context) *TopicIterator
- type Message
- type MessageStoragePolicy
- type PublishResult
- type PublishSettings
- type PushConfig
- type ReceiveSettings
- type Snapshot
- type SnapshotConfig
- type SnapshotConfigIterator
- type Subscription
- func (s *Subscription) Config(ctx context.Context) (SubscriptionConfig, error)
- func (s *Subscription) CreateSnapshot(ctx context.Context, name string) (*SnapshotConfig, error)
- func (s *Subscription) Delete(ctx context.Context) error
- func (s *Subscription) Exists(ctx context.Context) (bool, error)
- func (s *Subscription) IAM() *iam.Handle
- func (s *Subscription) ID() string
- func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Message)) error
- func (s *Subscription) SeekToSnapshot(ctx context.Context, snap *Snapshot) error
- func (s *Subscription) SeekToTime(ctx context.Context, t time.Time) error
- func (s *Subscription) String() string
- func (s *Subscription) Update(ctx context.Context, cfg SubscriptionConfigToUpdate) (SubscriptionConfig, error)
- type SubscriptionConfig
- type SubscriptionConfigToUpdate
- type SubscriptionIterator
- type Topic
- func (t *Topic) Config(ctx context.Context) (TopicConfig, error)
- func (t *Topic) Delete(ctx context.Context) error
- func (t *Topic) Exists(ctx context.Context) (bool, error)
- func (t *Topic) IAM() *iam.Handle
- func (t *Topic) ID() string
- func (t *Topic) Publish(ctx context.Context, msg *Message) *PublishResult
- func (t *Topic) Stop()
- func (t *Topic) String() string
- func (t *Topic) Subscriptions(ctx context.Context) *SubscriptionIterator
- func (t *Topic) Update(ctx context.Context, cfg TopicConfigToUpdate) (TopicConfig, error)
- type TopicConfig
- type TopicConfigToUpdate
- type TopicIterator
Examples ¶
- Client.CreateSubscription
- Client.CreateTopic
- Client.Snapshots
- Client.Subscriptions
- Client.TopicInProject
- Client.Topics
- NewClient
- Snapshot.Delete
- SnapshotConfigIterator.Next
- Subscription.Config
- Subscription.CreateSnapshot
- Subscription.Delete
- Subscription.Exists
- Subscription.Receive
- Subscription.Receive (MaxExtension)
- Subscription.Receive (MaxOutstanding)
- Subscription.SeekToSnapshot
- Subscription.SeekToTime
- Subscription.Update
- SubscriptionIterator.Next
- Topic.Delete
- Topic.Exists
- Topic.Publish
- Topic.Subscriptions
- TopicIterator.Next
Constants ¶
const ( // ScopePubSub grants permissions to view and manage Pub/Sub // topics and subscriptions. ScopePubSub = "https://www.googleapis.com/auth/pubsub" // ScopeCloudPlatform grants permissions to view and manage your data // across Google Cloud Platform services. ScopeCloudPlatform = "https://www.googleapis.com/auth/cloud-platform" )
const ( // The maximum number of messages that can be in a single publish request, as // determined by the PubSub service. MaxPublishRequestCount = 1000 // The maximum size of a single publish request in bytes, as determined by the PubSub service. MaxPublishRequestBytes = 1e7 )
Variables ¶
var ( // PullCount is a measure of the number of messages pulled. // It is EXPERIMENTAL and subject to change or removal without notice. PullCount = stats.Int64(statsPrefix+"pull_count", "Number of PubSub messages pulled", stats.UnitDimensionless) // AckCount is a measure of the number of messages acked. // It is EXPERIMENTAL and subject to change or removal without notice. AckCount = stats.Int64(statsPrefix+"ack_count", "Number of PubSub messages acked", stats.UnitDimensionless) // NackCount is a measure of the number of messages nacked. // It is EXPERIMENTAL and subject to change or removal without notice. NackCount = stats.Int64(statsPrefix+"nack_count", "Number of PubSub messages nacked", stats.UnitDimensionless) // ModAckCount is a measure of the number of messages whose ack-deadline was modified. // It is EXPERIMENTAL and subject to change or removal without notice. ModAckCount = stats.Int64(statsPrefix+"mod_ack_count", "Number of ack-deadlines modified", stats.UnitDimensionless) // StreamOpenCount is a measure of the number of times a streaming-pull stream was opened. // It is EXPERIMENTAL and subject to change or removal without notice. StreamOpenCount = stats.Int64(statsPrefix+"stream_open_count", "Number of calls opening a new streaming pull", stats.UnitDimensionless) // StreamRetryCount is a measure of the number of times a streaming-pull operation was retried. // It is EXPERIMENTAL and subject to change or removal without notice. StreamRetryCount = stats.Int64(statsPrefix+"stream_retry_count", "Number of retries of a stream send or receive", stats.UnitDimensionless) // StreamRequestCount is a measure of the number of requests sent on a streaming-pull stream. // It is EXPERIMENTAL and subject to change or removal without notice. StreamRequestCount = stats.Int64(statsPrefix+"stream_request_count", "Number gRPC StreamingPull request messages sent", stats.UnitDimensionless) // StreamResponseCount is a measure of the number of responses received on a streaming-pull stream. // It is EXPERIMENTAL and subject to change or removal without notice. StreamResponseCount = stats.Int64(statsPrefix+"stream_response_count", "Number of gRPC StreamingPull response messages received", stats.UnitDimensionless) // PullCountView is a cumulative sum of PullCount. // It is EXPERIMENTAL and subject to change or removal without notice. PullCountView *view.View // AckCountView is a cumulative sum of AckCount. // It is EXPERIMENTAL and subject to change or removal without notice. AckCountView *view.View // NackCountView is a cumulative sum of NackCount. // It is EXPERIMENTAL and subject to change or removal without notice. NackCountView *view.View // ModAckCountView is a cumulative sum of ModAckCount. // It is EXPERIMENTAL and subject to change or removal without notice. ModAckCountView *view.View // StreamOpenCountView is a cumulative sum of StreamOpenCount. // It is EXPERIMENTAL and subject to change or removal without notice. StreamOpenCountView *view.View // StreamRetryCountView is a cumulative sum of StreamRetryCount. // It is EXPERIMENTAL and subject to change or removal without notice. StreamRetryCountView *view.View // StreamRequestCountView is a cumulative sum of StreamRequestCount. // It is EXPERIMENTAL and subject to change or removal without notice. StreamRequestCountView *view.View // StreamResponseCountView is a cumulative sum of StreamResponseCount. // It is EXPERIMENTAL and subject to change or removal without notice. StreamResponseCountView *view.View )
var DefaultPublishSettings = PublishSettings{ DelayThreshold: 1 * time.Millisecond, CountThreshold: 100, ByteThreshold: 1e6, Timeout: 60 * time.Second, }
DefaultPublishSettings holds the default values for topics' PublishSettings.
var DefaultReceiveSettings = ReceiveSettings{ MaxExtension: 10 * time.Minute, MaxOutstandingMessages: 1000, MaxOutstandingBytes: 1e9, NumGoroutines: 1, }
DefaultReceiveSettings holds the default values for ReceiveSettings.
var ErrOversizedMessage = bundler.ErrOversizedItem
ErrOversizedMessage indicates that a message's size exceeds MaxPublishRequestBytes.
Functions ¶
This section is empty.
Types ¶
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client is a Google Pub/Sub client scoped to a single project.
Clients should be reused rather than being created as needed. A Client may be shared by multiple goroutines.
func NewClient ¶
func NewClient(ctx context.Context, projectID string, opts ...option.ClientOption) (c *Client, err error)
NewClient creates a new PubSub client.
Example ¶
package main import ( "cloud.google.com/go/pubsub" "golang.org/x/net/context" ) func main() { ctx := context.Background() _, err := pubsub.NewClient(ctx, "project-id") if err != nil { // TODO: Handle error. } // See the other examples to learn how to use the Client. }
Output:
func (*Client) Close ¶ added in v0.2.0
Close releases any resources held by the client, such as memory and goroutines.
If the client is available for the lifetime of the program, then Close need not be called at exit.
func (*Client) CreateSubscription ¶ added in v0.2.0
func (c *Client) CreateSubscription(ctx context.Context, id string, cfg SubscriptionConfig) (*Subscription, error)
CreateSubscription creates a new subscription on a topic.
id is the name of the subscription to create. It must start with a letter, and contain only letters ([A-Za-z]), numbers ([0-9]), dashes (-), underscores (_), periods (.), tildes (~), plus (+) or percent signs (%). It must be between 3 and 255 characters in length, and must not start with "goog".
cfg.Topic is the topic from which the subscription should receive messages. It need not belong to the same project as the subscription. This field is required.
cfg.AckDeadline is the maximum time after a subscriber receives a message before the subscriber should acknowledge the message. It must be between 10 and 600 seconds (inclusive), and is rounded down to the nearest second. If the provided ackDeadline is 0, then the default value of 10 seconds is used. Note: messages which are obtained via Subscription.Receive need not be acknowledged within this deadline, as the deadline will be automatically extended.
cfg.PushConfig may be set to configure this subscription for push delivery.
If the subscription already exists an error will be returned.
Example ¶
package main import ( "time" "cloud.google.com/go/pubsub" "golang.org/x/net/context" ) func main() { ctx := context.Background() client, err := pubsub.NewClient(ctx, "project-id") if err != nil { // TODO: Handle error. } // Create a new topic with the given name. topic, err := client.CreateTopic(ctx, "topicName") if err != nil { // TODO: Handle error. } // Create a new subscription to the previously created topic // with the given name. sub, err := client.CreateSubscription(ctx, "subName", pubsub.SubscriptionConfig{ Topic: topic, AckDeadline: 10 * time.Second, }) if err != nil { // TODO: Handle error. } _ = sub // TODO: use the subscription. }
Output:
func (*Client) CreateTopic ¶ added in v0.2.0
CreateTopic creates a new topic. The specified topic ID must start with a letter, and contain only letters ([A-Za-z]), numbers ([0-9]), dashes (-), underscores (_), periods (.), tildes (~), plus (+) or percent signs (%). It must be between 3 and 255 characters in length, and must not start with "goog". If the topic already exists an error will be returned.
Example ¶
package main import ( "cloud.google.com/go/pubsub" "golang.org/x/net/context" ) func main() { ctx := context.Background() client, err := pubsub.NewClient(ctx, "project-id") if err != nil { // TODO: Handle error. } // Create a new topic with the given name. topic, err := client.CreateTopic(ctx, "topicName") if err != nil { // TODO: Handle error. } _ = topic // TODO: use the topic. }
Output:
func (*Client) Snapshots ¶ added in v0.18.0
func (c *Client) Snapshots(ctx context.Context) *SnapshotConfigIterator
Snapshots returns an iterator which returns snapshots for this project.
Example ¶
package main import ( "cloud.google.com/go/pubsub" "golang.org/x/net/context" ) func main() { ctx := context.Background() client, err := pubsub.NewClient(ctx, "project-id") if err != nil { // TODO: Handle error. } // List all snapshots for the project. iter := client.Snapshots(ctx) _ = iter // TODO: iterate using Next. }
Output:
func (*Client) Subscription ¶
func (c *Client) Subscription(id string) *Subscription
Subscription creates a reference to a subscription.
func (*Client) SubscriptionInProject ¶ added in v0.19.0
func (c *Client) SubscriptionInProject(id, projectID string) *Subscription
SubscriptionInProject creates a reference to a subscription in a given project.
func (*Client) Subscriptions ¶
func (c *Client) Subscriptions(ctx context.Context) *SubscriptionIterator
Subscriptions returns an iterator which returns all of the subscriptions for the client's project.
Example ¶
package main import ( "cloud.google.com/go/pubsub" "golang.org/x/net/context" ) func main() { ctx := context.Background() client, err := pubsub.NewClient(ctx, "project-id") if err != nil { // TODO: Handle error. } // List all subscriptions of the project. it := client.Subscriptions(ctx) _ = it // TODO: iterate using Next. }
Output:
func (*Client) Topic ¶
Topic creates a reference to a topic in the client's project.
If a Topic's Publish method is called, it has background goroutines associated with it. Clean them up by calling Topic.Stop.
Avoid creating many Topic instances if you use them to publish.
func (*Client) TopicInProject ¶ added in v0.12.0
TopicInProject creates a reference to a topic in the given project.
If a Topic's Publish method is called, it has background goroutines associated with it. Clean them up by calling Topic.Stop.
Avoid creating many Topic instances if you use them to publish.
Example ¶
Use TopicInProject to refer to a topic that is not in the client's project, such as a public topic.
package main import ( "cloud.google.com/go/pubsub" "golang.org/x/net/context" ) func main() { ctx := context.Background() client, err := pubsub.NewClient(ctx, "project-id") if err != nil { // TODO: Handle error. } topic := client.TopicInProject("topicName", "another-project-id") _ = topic // TODO: use the topic. }
Output:
func (*Client) Topics ¶
func (c *Client) Topics(ctx context.Context) *TopicIterator
Topics returns an iterator which returns all of the topics for the client's project.
Example ¶
package main import ( "cloud.google.com/go/pubsub" "golang.org/x/net/context" ) func main() { ctx := context.Background() client, err := pubsub.NewClient(ctx, "project-id") if err != nil { // TODO: Handle error. } it := client.Topics(ctx) _ = it // TODO: iterate using Next. }
Output:
type Message ¶
type Message struct { // ID identifies this message. // This ID is assigned by the server and is populated for Messages obtained from a subscription. // This field is read-only. ID string // Data is the actual data in the message. Data []byte // Attributes represents the key-value pairs the current message // is labelled with. Attributes map[string]string // The time at which the message was published. // This is populated by the server for Messages obtained from a subscription. // This field is read-only. PublishTime time.Time // contains filtered or unexported fields }
Message represents a Pub/Sub message.
func (*Message) Ack ¶ added in v0.8.0
func (m *Message) Ack()
Ack indicates successful processing of a Message passed to the Subscriber.Receive callback. It should not be called on any other Message value. If message acknowledgement fails, the Message will be redelivered. Client code must call Ack or Nack when finished for each received Message. Calls to Ack or Nack have no effect after the first call.
func (*Message) Nack ¶ added in v0.8.0
func (m *Message) Nack()
Nack indicates that the client will not or cannot process a Message passed to the Subscriber.Receive callback. It should not be called on any other Message value. Nack will result in the Message being redelivered more quickly than if it were allowed to expire. Client code must call Ack or Nack when finished for each received Message. Calls to Ack or Nack have no effect after the first call.
type MessageStoragePolicy ¶ added in v0.26.0
type MessageStoragePolicy struct { // The list of GCP regions where messages that are published to the topic may // be persisted in storage. Messages published by publishers running in // non-allowed GCP regions (or running outside of GCP altogether) will be // routed for storage in one of the allowed regions. An empty list indicates a // misconfiguration at the project or organization level, which will result in // all Publish operations failing. AllowedPersistenceRegions []string }
MessageStoragePolicy constrains how messages published to the topic may be stored. It is determined when the topic is created based on the policy configured at the project level.
type PublishResult ¶ added in v0.8.0
type PublishResult struct {
// contains filtered or unexported fields
}
A PublishResult holds the result from a call to Publish.
func (*PublishResult) Get ¶ added in v0.8.0
func (r *PublishResult) Get(ctx context.Context) (serverID string, err error)
Get returns the server-generated message ID and/or error result of a Publish call. Get blocks until the Publish call completes or the context is done.
func (*PublishResult) Ready ¶ added in v0.8.0
func (r *PublishResult) Ready() <-chan struct{}
Ready returns a channel that is closed when the result is ready. When the Ready channel is closed, Get is guaranteed not to block.
type PublishSettings ¶ added in v0.8.0
type PublishSettings struct { // Publish a non-empty batch after this delay has passed. DelayThreshold time.Duration // Publish a batch when it has this many messages. The maximum is // MaxPublishRequestCount. CountThreshold int // Publish a batch when its size in bytes reaches this value. ByteThreshold int // The number of goroutines that invoke the Publish RPC concurrently. // Defaults to a multiple of GOMAXPROCS. NumGoroutines int // The maximum time that the client will attempt to publish a bundle of messages. Timeout time.Duration }
PublishSettings control the bundling of published messages.
type PushConfig ¶
type PushConfig struct { // A URL locating the endpoint to which messages should be pushed. Endpoint string // Endpoint configuration attributes. See https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.subscriptions#pushconfig for more details. Attributes map[string]string }
PushConfig contains configuration for subscriptions that operate in push mode.
type ReceiveSettings ¶ added in v0.8.0
type ReceiveSettings struct { // MaxExtension is the maximum period for which the Subscription should // automatically extend the ack deadline for each message. // // The Subscription will automatically extend the ack deadline of all // fetched Messages for the duration specified. Automatic deadline // extension may be disabled by specifying a duration less than 0. MaxExtension time.Duration // MaxOutstandingMessages is the maximum number of unprocessed messages // (unacknowledged but not yet expired). If MaxOutstandingMessages is 0, it // will be treated as if it were DefaultReceiveSettings.MaxOutstandingMessages. // If the value is negative, then there will be no limit on the number of // unprocessed messages. MaxOutstandingMessages int // MaxOutstandingBytes is the maximum size of unprocessed messages // (unacknowledged but not yet expired). If MaxOutstandingBytes is 0, it will // be treated as if it were DefaultReceiveSettings.MaxOutstandingBytes. If // the value is negative, then there will be no limit on the number of bytes // for unprocessed messages. MaxOutstandingBytes int // NumGoroutines is the number of goroutines Receive will spawn to pull // messages concurrently. If NumGoroutines is less than 1, it will be treated // as if it were DefaultReceiveSettings.NumGoroutines. // // NumGoroutines does not limit the number of messages that can be processed // concurrently. Even with one goroutine, many messages might be processed at // once, because that goroutine may continually receive messages and invoke the // function passed to Receive on them. To limit the number of messages being // processed concurrently, set MaxOutstandingMessages. NumGoroutines int }
ReceiveSettings configure the Receive method. A zero ReceiveSettings will result in values equivalent to DefaultReceiveSettings.
type Snapshot ¶ added in v0.18.0
type Snapshot struct {
// contains filtered or unexported fields
}
Snapshot is a reference to a PubSub snapshot.
func (*Snapshot) Delete ¶ added in v0.18.0
Delete deletes a snapshot.
Example ¶
package main import ( "cloud.google.com/go/pubsub" "golang.org/x/net/context" ) func main() { ctx := context.Background() client, err := pubsub.NewClient(ctx, "project-id") if err != nil { // TODO: Handle error. } snap := client.Snapshot("snapshotName") if err := snap.Delete(ctx); err != nil { // TODO: Handle error. } }
Output:
type SnapshotConfig ¶ added in v0.18.0
SnapshotConfig contains the details of a Snapshot.
type SnapshotConfigIterator ¶ added in v0.18.0
type SnapshotConfigIterator struct {
// contains filtered or unexported fields
}
SnapshotConfigIterator is an iterator that returns a series of snapshots.
func (*SnapshotConfigIterator) Next ¶ added in v0.18.0
func (snaps *SnapshotConfigIterator) Next() (*SnapshotConfig, error)
Next returns the next SnapshotConfig. Its second return value is iterator.Done if there are no more results. Once Next returns iterator.Done, all subsequent calls will return iterator.Done.
Example ¶
package main import ( "cloud.google.com/go/pubsub" "golang.org/x/net/context" "google.golang.org/api/iterator" ) func main() { ctx := context.Background() client, err := pubsub.NewClient(ctx, "project-id") if err != nil { // TODO: Handle error. } // List all snapshots for the project. iter := client.Snapshots(ctx) for { snapConfig, err := iter.Next() if err == iterator.Done { break } if err != nil { // TODO: Handle error. } _ = snapConfig // TODO: use the SnapshotConfig. } }
Output:
type Subscription ¶
type Subscription struct { // Settings for pulling messages. Configure these before calling Receive. ReceiveSettings ReceiveSettings // contains filtered or unexported fields }
Subscription is a reference to a PubSub subscription.
func (*Subscription) Config ¶
func (s *Subscription) Config(ctx context.Context) (SubscriptionConfig, error)
Config fetches the current configuration for the subscription.
Example ¶
package main import ( "fmt" "cloud.google.com/go/pubsub" "golang.org/x/net/context" ) func main() { ctx := context.Background() client, err := pubsub.NewClient(ctx, "project-id") if err != nil { // TODO: Handle error. } sub := client.Subscription("subName") config, err := sub.Config(ctx) if err != nil { // TODO: Handle error. } fmt.Println(config) }
Output:
func (*Subscription) CreateSnapshot ¶ added in v0.18.0
func (s *Subscription) CreateSnapshot(ctx context.Context, name string) (*SnapshotConfig, error)
CreateSnapshot creates a new snapshot from this subscription. The snapshot will be for the topic this subscription is subscribed to. If the name is empty string, a unique name is assigned.
The created snapshot is guaranteed to retain:
(a) The existing backlog on the subscription. More precisely, this is defined as the messages in the subscription's backlog that are unacknowledged when Snapshot returns without error. (b) Any messages published to the subscription's topic following Snapshot returning without error.
Example ¶
package main import ( "cloud.google.com/go/pubsub" "golang.org/x/net/context" ) func main() { ctx := context.Background() client, err := pubsub.NewClient(ctx, "project-id") if err != nil { // TODO: Handle error. } sub := client.Subscription("subName") snapConfig, err := sub.CreateSnapshot(ctx, "snapshotName") if err != nil { // TODO: Handle error. } _ = snapConfig // TODO: Use SnapshotConfig. }
Output:
func (*Subscription) Delete ¶
func (s *Subscription) Delete(ctx context.Context) error
Delete deletes the subscription.
Example ¶
package main import ( "cloud.google.com/go/pubsub" "golang.org/x/net/context" ) func main() { ctx := context.Background() client, err := pubsub.NewClient(ctx, "project-id") if err != nil { // TODO: Handle error. } sub := client.Subscription("subName") if err := sub.Delete(ctx); err != nil { // TODO: Handle error. } }
Output:
func (*Subscription) Exists ¶
func (s *Subscription) Exists(ctx context.Context) (bool, error)
Exists reports whether the subscription exists on the server.
Example ¶
package main import ( "cloud.google.com/go/pubsub" "golang.org/x/net/context" ) func main() { ctx := context.Background() client, err := pubsub.NewClient(ctx, "project-id") if err != nil { // TODO: Handle error. } sub := client.Subscription("subName") ok, err := sub.Exists(ctx) if err != nil { // TODO: Handle error. } if !ok { // Subscription doesn't exist. } }
Output:
func (*Subscription) IAM ¶ added in v0.4.0
func (s *Subscription) IAM() *iam.Handle
func (*Subscription) ID ¶ added in v0.2.0
func (s *Subscription) ID() string
ID returns the unique identifier of the subscription within its project.
func (*Subscription) Receive ¶ added in v0.8.0
Receive calls f with the outstanding messages from the subscription. It blocks until ctx is done, or the service returns a non-retryable error.
The standard way to terminate a Receive is to cancel its context:
cctx, cancel := context.WithCancel(ctx) err := sub.Receive(cctx, callback) // Call cancel from callback, or another goroutine.
If the service returns a non-retryable error, Receive returns that error after all of the outstanding calls to f have returned. If ctx is done, Receive returns nil after all of the outstanding calls to f have returned and all messages have been acknowledged or have expired.
Receive calls f concurrently from multiple goroutines. It is encouraged to process messages synchronously in f, even if that processing is relatively time-consuming; Receive will spawn new goroutines for incoming messages, limited by MaxOutstandingMessages and MaxOutstandingBytes in ReceiveSettings.
The context passed to f will be canceled when ctx is Done or there is a fatal service error.
Receive will automatically extend the ack deadline of all fetched Messages up to the period specified by s.ReceiveSettings.MaxExtension.
Each Subscription may have only one invocation of Receive active at a time.
Example ¶
package main import ( "cloud.google.com/go/pubsub" "golang.org/x/net/context" ) func main() { ctx := context.Background() client, err := pubsub.NewClient(ctx, "project-id") if err != nil { // TODO: Handle error. } sub := client.Subscription("subName") err = sub.Receive(ctx, func(ctx context.Context, m *pubsub.Message) { // TODO: Handle message. // NOTE: May be called concurrently; synchronize access to shared memory. m.Ack() }) if err != context.Canceled { // TODO: Handle error. } }
Output:
Example (MaxExtension) ¶
This example shows how to configure keepalive so that unacknoweldged messages expire quickly, allowing other subscribers to take them.
package main import ( "time" "cloud.google.com/go/pubsub" "golang.org/x/net/context" ) func main() { ctx := context.Background() client, err := pubsub.NewClient(ctx, "project-id") if err != nil { // TODO: Handle error. } sub := client.Subscription("subName") // This program is expected to process and acknowledge messages in 30 seconds. If // not, the Pub/Sub API will assume the message is not acknowledged. sub.ReceiveSettings.MaxExtension = 30 * time.Second err = sub.Receive(ctx, func(ctx context.Context, m *pubsub.Message) { // TODO: Handle message. m.Ack() }) if err != context.Canceled { // TODO: Handle error. } }
Output:
Example (MaxOutstanding) ¶
This example shows how to throttle Subscription.Receive, which aims for high throughput by default. By limiting the number of messages and/or bytes being processed at once, you can bound your program's resource consumption.
package main import ( "cloud.google.com/go/pubsub" "golang.org/x/net/context" ) func main() { ctx := context.Background() client, err := pubsub.NewClient(ctx, "project-id") if err != nil { // TODO: Handle error. } sub := client.Subscription("subName") sub.ReceiveSettings.MaxOutstandingMessages = 5 sub.ReceiveSettings.MaxOutstandingBytes = 10e6 err = sub.Receive(ctx, func(ctx context.Context, m *pubsub.Message) { // TODO: Handle message. m.Ack() }) if err != context.Canceled { // TODO: Handle error. } }
Output:
func (*Subscription) SeekToSnapshot ¶ added in v0.18.0
func (s *Subscription) SeekToSnapshot(ctx context.Context, snap *Snapshot) error
SeekToSnapshot seeks the subscription to a snapshot.
The snapshot need not be created from this subscription, but it must be for the topic this subscription is subscribed to.
Example ¶
package main import ( "cloud.google.com/go/pubsub" "golang.org/x/net/context" ) func main() { ctx := context.Background() client, err := pubsub.NewClient(ctx, "project-id") if err != nil { // TODO: Handle error. } sub := client.Subscription("subName") snap := client.Snapshot("snapshotName") if err := sub.SeekToSnapshot(ctx, snap); err != nil { // TODO: Handle error. } }
Output:
func (*Subscription) SeekToTime ¶ added in v0.18.0
SeekToTime seeks the subscription to a point in time.
Messages retained in the subscription that were published before this time are marked as acknowledged, and messages retained in the subscription that were published after this time are marked as unacknowledged. Note that this operation affects only those messages retained in the subscription (configured by SnapshotConfig). For example, if `time` corresponds to a point before the message retention window (or to a point before the system's notion of the subscription creation time), only retained messages will be marked as unacknowledged, and already-expunged messages will not be restored.
Example ¶
package main import ( "time" "cloud.google.com/go/pubsub" "golang.org/x/net/context" ) func main() { ctx := context.Background() client, err := pubsub.NewClient(ctx, "project-id") if err != nil { // TODO: Handle error. } sub := client.Subscription("subName") if err := sub.SeekToTime(ctx, time.Now().Add(-time.Hour)); err != nil { // TODO: Handle error. } }
Output:
func (*Subscription) String ¶ added in v0.2.0
func (s *Subscription) String() string
String returns the globally unique printable name of the subscription.
func (*Subscription) Update ¶ added in v0.10.0
func (s *Subscription) Update(ctx context.Context, cfg SubscriptionConfigToUpdate) (SubscriptionConfig, error)
Update changes an existing subscription according to the fields set in cfg. It returns the new SubscriptionConfig.
Update returns an error if no fields were modified.
Example ¶
package main import ( "cloud.google.com/go/pubsub" "golang.org/x/net/context" ) func main() { ctx := context.Background() client, err := pubsub.NewClient(ctx, "project-id") if err != nil { // TODO: Handle error. } sub := client.Subscription("subName") subConfig, err := sub.Update(ctx, pubsub.SubscriptionConfigToUpdate{ PushConfig: &pubsub.PushConfig{Endpoint: "https://example.com/push"}, }) if err != nil { // TODO: Handle error. } _ = subConfig // TODO: Use SubscriptionConfig. }
Output:
type SubscriptionConfig ¶
type SubscriptionConfig struct { Topic *Topic PushConfig PushConfig // The default maximum time after a subscriber receives a message before // the subscriber should acknowledge the message. Note: messages which are // obtained via Subscription.Receive need not be acknowledged within this // deadline, as the deadline will be automatically extended. AckDeadline time.Duration // Whether to retain acknowledged messages. If true, acknowledged messages // will not be expunged until they fall out of the RetentionDuration window. RetainAckedMessages bool // How long to retain messages in backlog, from the time of publish. If // RetainAckedMessages is true, this duration affects the retention of // acknowledged messages, otherwise only unacknowledged messages are retained. // Defaults to 7 days. Cannot be longer than 7 days or shorter than 10 minutes. RetentionDuration time.Duration // The set of labels for the subscription. Labels map[string]string }
SubscriptionConfig describes the configuration of a subscription.
type SubscriptionConfigToUpdate ¶ added in v0.10.0
type SubscriptionConfigToUpdate struct { // If non-nil, the push config is changed. PushConfig *PushConfig // If non-zero, the ack deadline is changed. AckDeadline time.Duration // If set, RetainAckedMessages is changed. RetainAckedMessages optional.Bool // If non-zero, RetentionDuration is changed. RetentionDuration time.Duration // If non-nil, the current set of labels is completely // replaced by the new set. // This field has beta status. It is not subject to the stability guarantee // and may change. Labels map[string]string }
SubscriptionConfigToUpdate describes how to update a subscription.
type SubscriptionIterator ¶
type SubscriptionIterator struct {
// contains filtered or unexported fields
}
SubscriptionIterator is an iterator that returns a series of subscriptions.
func (*SubscriptionIterator) Next ¶
func (subs *SubscriptionIterator) Next() (*Subscription, error)
Next returns the next subscription. If there are no more subscriptions, iterator.Done will be returned.
Example ¶
package main import ( "fmt" "cloud.google.com/go/pubsub" "golang.org/x/net/context" "google.golang.org/api/iterator" ) func main() { ctx := context.Background() client, err := pubsub.NewClient(ctx, "project-id") if err != nil { // TODO: Handle error. } // List all subscriptions of the project. it := client.Subscriptions(ctx) for { sub, err := it.Next() if err == iterator.Done { break } if err != nil { // TODO: Handle error. } fmt.Println(sub) } }
Output:
type Topic ¶
type Topic struct { // Settings for publishing messages. All changes must be made before the // first call to Publish. The default is DefaultPublishSettings. PublishSettings PublishSettings // contains filtered or unexported fields }
Topic is a reference to a PubSub topic.
The methods of Topic are safe for use by multiple goroutines.
func (*Topic) Config ¶ added in v0.26.0
func (t *Topic) Config(ctx context.Context) (TopicConfig, error)
Config returns the TopicConfig for the topic.
func (*Topic) Delete ¶
Delete deletes the topic.
Example ¶
package main import ( "cloud.google.com/go/pubsub" "golang.org/x/net/context" ) func main() { ctx := context.Background() client, err := pubsub.NewClient(ctx, "project-id") if err != nil { // TODO: Handle error. } topic := client.Topic("topicName") if err := topic.Delete(ctx); err != nil { // TODO: Handle error. } }
Output:
func (*Topic) Exists ¶
Exists reports whether the topic exists on the server.
Example ¶
package main import ( "cloud.google.com/go/pubsub" "golang.org/x/net/context" ) func main() { ctx := context.Background() client, err := pubsub.NewClient(ctx, "project-id") if err != nil { // TODO: Handle error. } topic := client.Topic("topicName") ok, err := topic.Exists(ctx) if err != nil { // TODO: Handle error. } if !ok { // Topic doesn't exist. } }
Output:
func (*Topic) ID ¶ added in v0.2.0
ID returns the unique identifier of the topic within its project.
func (*Topic) Publish ¶
func (t *Topic) Publish(ctx context.Context, msg *Message) *PublishResult
Publish publishes msg to the topic asynchronously. Messages are batched and sent according to the topic's PublishSettings. Publish never blocks.
Publish returns a non-nil PublishResult which will be ready when the message has been sent (or has failed to be sent) to the server.
Publish creates goroutines for batching and sending messages. These goroutines need to be stopped by calling t.Stop(). Once stopped, future calls to Publish will immediately return a PublishResult with an error.
Example ¶
package main import ( "fmt" "cloud.google.com/go/pubsub" "golang.org/x/net/context" ) func main() { ctx := context.Background() client, err := pubsub.NewClient(ctx, "project-id") if err != nil { // TODO: Handle error. } topic := client.Topic("topicName") defer topic.Stop() var results []*pubsub.PublishResult r := topic.Publish(ctx, &pubsub.Message{ Data: []byte("hello world"), }) results = append(results, r) // Do other work ... for _, r := range results { id, err := r.Get(ctx) if err != nil { // TODO: Handle error. } fmt.Printf("Published a message with a message ID: %s\n", id) } }
Output:
func (*Topic) Stop ¶ added in v0.8.0
func (t *Topic) Stop()
Send all remaining published messages and stop goroutines created for handling publishing. Returns once all outstanding messages have been sent or have failed to be sent.
func (*Topic) String ¶ added in v0.2.0
String returns the printable globally unique name for the topic.
func (*Topic) Subscriptions ¶
func (t *Topic) Subscriptions(ctx context.Context) *SubscriptionIterator
Subscriptions returns an iterator which returns the subscriptions for this topic.
Some of the returned subscriptions may belong to a project other than t.
Example ¶
package main import ( "cloud.google.com/go/pubsub" "golang.org/x/net/context" "google.golang.org/api/iterator" ) func main() { ctx := context.Background() client, err := pubsub.NewClient(ctx, "project-id") if err != nil { // TODO: Handle error. } topic := client.Topic("topic-name") // List all subscriptions of the topic (maybe of multiple projects). for subs := topic.Subscriptions(ctx); ; { sub, err := subs.Next() if err == iterator.Done { break } if err != nil { // TODO: Handle error. } _ = sub // TODO: use the subscription. } }
Output:
func (*Topic) Update ¶ added in v0.26.0
func (t *Topic) Update(ctx context.Context, cfg TopicConfigToUpdate) (TopicConfig, error)
Update changes an existing topic according to the fields set in cfg. It returns the new TopicConfig.
Any call to Update (even with an empty TopicConfigToUpdate) will update the MessageStoragePolicy for the topic from the organization's settings.
type TopicConfig ¶ added in v0.26.0
type TopicConfig struct { // The set of labels for the topic. Labels map[string]string // The topic's message storage policy. MessageStoragePolicy MessageStoragePolicy }
TopicConfig describes the configuration of a topic.
type TopicConfigToUpdate ¶ added in v0.26.0
type TopicConfigToUpdate struct { // If non-nil, the current set of labels is completely // replaced by the new set. // This field has beta status. It is not subject to the stability guarantee // and may change. Labels map[string]string }
TopicConfigToUpdate describes how to update a topic.
type TopicIterator ¶
type TopicIterator struct {
// contains filtered or unexported fields
}
TopicIterator is an iterator that returns a series of topics.
func (*TopicIterator) Next ¶
func (tps *TopicIterator) Next() (*Topic, error)
Next returns the next topic. If there are no more topics, iterator.Done will be returned.
Example ¶
package main import ( "fmt" "cloud.google.com/go/pubsub" "golang.org/x/net/context" "google.golang.org/api/iterator" ) func main() { ctx := context.Background() client, err := pubsub.NewClient(ctx, "project-id") if err != nil { // TODO: Handle error. } // List all topics. it := client.Topics(ctx) for { t, err := it.Next() if err == iterator.Done { break } if err != nil { // TODO: Handle error. } fmt.Println(t) } }
Output:
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Package pubsub is an auto-generated package for the Google Cloud Pub/Sub API.
|
Package pubsub is an auto-generated package for the Google Cloud Pub/Sub API. |
internal
|
|
Package loadtest implements load testing for pubsub, following the interface defined in https://github.com/GoogleCloudPlatform/pubsub/tree/master/load-test-framework/ .
|
Package loadtest implements load testing for pubsub, following the interface defined in https://github.com/GoogleCloudPlatform/pubsub/tree/master/load-test-framework/ . |
pb
Package google_pubsub_loadtest is a generated protocol buffer package.
|
Package google_pubsub_loadtest is a generated protocol buffer package. |
Package pstest provides a fake Cloud PubSub service for testing.
|
Package pstest provides a fake Cloud PubSub service for testing. |