README

Pub/Sub Lite Go Reference

This library is in BETA. Backwards-incompatible changes may be made before stable v1.0.0 is released.

Example Usage

import (
	"cloud.google.com/go/pubsub"
	"cloud.google.com/go/pubsublite/pscompat"
)

To publish messages to a topic:

// Create a PublisherClient for topic1 in zone us-central1-b.
// See https://cloud.google.com/pubsub/lite/docs/locations for available zones.
const topic = "projects/project-id/locations/us-central1-b/topics/topic1"
publisher, err := pscompat.NewPublisherClient(ctx, topic)
if err != nil {
	log.Fatal(err)
}

// Publish "hello world".
res := publisher.Publish(ctx, &pubsub.Message{
	Data: []byte("hello world"),
})
// The publish happens asynchronously.
// Later, you can get the result from res:
...
msgID, err := res.Get(ctx)
if err != nil {
	log.Fatal(err)
}

To receive messages for a subscription:

// Create a SubscriberClient for subscription1 in zone us-central1-b.
const subscription = "projects/project-id/locations/us-central1-b/subscriptions/subscription1"
subscriber, err := pscompat.NewSubscriberClient(ctx, subscription)
if err != nil {
	log.Fatal(err)
}

// Use a callback to receive messages.
// Call cancel() to stop receiving messages.
cctx, cancel := context.WithCancel(ctx)
err = subscriber.Receive(cctx, func(ctx context.Context, m *pubsub.Message) {
	fmt.Println(m.Data)
	m.Ack() // Acknowledge that we've consumed the message.
})
if err != nil {
	log.Println(err)
}

Documentation

Overview

Package pubsublite provides an easy way to publish and receive messages using the Pub/Sub Lite service.

Google Pub/Sub services are designed to provide reliable, many-to-many, asynchronous messaging between applications. Publisher applications can send messages to a topic and other applications can subscribe to that topic to receive the messages. By decoupling senders and receivers, Google Pub/Sub allows developers to communicate between independently written applications.

Compared to Cloud Pub/Sub, Pub/Sub Lite provides partitioned zonal data storage with predefined throughput and storage capacity. Guidance on how to choose between Cloud Pub/Sub and Pub/Sub Lite is available at https://cloud.google.com/pubsub/docs/choosing-pubsub-or-lite.

More information about Pub/Sub Lite is available at https://cloud.google.com/pubsub/lite.

Note: This library is in BETA. Backwards-incompatible changes may be made before stable v1.0.0 is released.

Introduction

See https://pkg.go.dev/cloud.google.com/go for authentication, timeouts, connection pooling and similar aspects of this package.

The following imports are required for code snippets below:

import (
  "cloud.google.com/go/pubsub"
  "cloud.google.com/go/pubsublite"
  "cloud.google.com/go/pubsublite/pscompat"
)

More complete examples can be found at https://pkg.go.dev/cloud.google.com/go/pubsublite#pkg-examples and https://pkg.go.dev/cloud.google.com/go/pubsublite/pscompat#pkg-examples.

Creating Topics

Messages are published to topics. Pub/Sub Lite topics may be created like so:

ctx := context.Background()
const topicPath = "projects/my-project/locations/us-central1-c/topics/my-topic"
topicConfig := pubsublite.TopicConfig{
  Name:                       topicPath,
  PartitionCount:             1,
  PublishCapacityMiBPerSec:   4,
  SubscribeCapacityMiBPerSec: 4,
  PerPartitionBytes:          30 * 1024 * 1024 * 1024,  // 30 GiB
  RetentionDuration:          pubsublite.InfiniteRetention,
}
adminClient, err := pubsublite.NewAdminClient(ctx, "us-central1")
if err != nil {
  // TODO: Handle error.
}
if _, err = adminClient.CreateTopic(ctx, topicConfig); err != nil {
  // TODO: Handle error.
}

See https://cloud.google.com/pubsub/lite/docs/topics for more information about how Pub/Sub Lite topics are configured.

See https://cloud.google.com/pubsub/lite/docs/locations for the list of zones where Pub/Sub Lite is available.

Publishing

The pubsublite/pscompat subpackage contains clients for publishing and receiving messages, which have similar interfaces to their pubsub.Topic and pubsub.Subscription counterparts in the Cloud Pub/Sub library: https://pkg.go.dev/cloud.google.com/go/pubsub.

Pub/Sub Lite uses gRPC streams extensively for high throughput. For more differences, see https://pkg.go.dev/cloud.google.com/go/pubsublite/pscompat.

To publish messages to a topic, first create a PublisherClient:

publisher, err := pscompat.NewPublisherClient(ctx, topicPath)
if err != nil {
  // TODO: Handle error.
}

Then call Publish:

result := publisher.Publish(ctx, &pubsub.Message{Data: []byte("payload")})

Publish queues the message for publishing and returns immediately. When enough messages have accumulated, or enough time has elapsed, the batch of messages is sent to the Pub/Sub Lite service. Thresholds for batching can be configured in PublishSettings.

Publish returns a PublishResult, which behaves like a future; its Get method blocks until the message has been sent (or has failed to be sent) to the service:

id, err := result.Get(ctx)
if err != nil {
  // TODO: Handle error.
}

Once you've finishing publishing all messages, call Stop to flush all messages to the service and close gRPC streams. The PublisherClient can no longer be used after it has been stopped or has terminated due to a permanent service error.

publisher.Stop()

See https://cloud.google.com/pubsub/lite/docs/publishing for more information about publishing.

Creating Subscriptions

To receive messages published to a topic, create a subscription to the topic. There may be more than one subscription per topic; each message that is published to the topic will be delivered to all of its subscriptions.

Pub/Sub Lite subscriptions may be created like so:

const subscriptionPath = "projects/my-project/locations/us-central1-c/subscriptions/my-subscription"
subscriptionConfig := pubsublite.SubscriptionConfig{
  Name:                subscriptionPath,
  Topic:               topicPath,
  DeliveryRequirement: pubsublite.DeliverImmediately,
}
if _, err = adminClient.CreateSubscription(ctx, subscriptionConfig); err != nil {
  // TODO: Handle error.
}

See https://cloud.google.com/pubsub/lite/docs/subscriptions for more information about how subscriptions are configured.

Receiving

To receive messages for a subscription, first create a SubscriberClient:

subscriber, err := pscompat.NewSubscriberClient(ctx, subscriptionPath)

Messages are then consumed from a subscription via callback. The callback may be invoked concurrently by multiple goroutines (one per partition that the subscriber client is connected to).

cctx, cancel := context.WithCancel(ctx)
err = subscriber.Receive(cctx, func(ctx context.Context, m *pubsub.Message) {
  log.Printf("Got message: %s", m.Data)
  m.Ack()
})
if err != nil {
  // TODO: Handle error.
}

Receive blocks until either the context is canceled or a fatal service error occurs. To terminate a call to Receive, cancel its context:

cancel()

Clients must call pubsub.Message.Ack() or pubsub.Message.Nack() for every message received. Pub/Sub Lite does not have ACK deadlines. Pub/Sub Lite also does not actually have the concept of NACK. The default behavior terminates the SubscriberClient. In Pub/Sub Lite, only a single subscriber for a given subscription is connected to any partition at a time, and there is no other client that may be able to handle messages.

See https://cloud.google.com/pubsub/lite/docs/subscribing for more information about receiving messages.

Index

Examples

Constants

View Source
const InfiniteRetention = time.Duration(-1)

    InfiniteRetention is a sentinel used in topic configs to denote an infinite retention duration (i.e. retain messages as long as there is available storage).

    Variables

    This section is empty.

    Functions

    This section is empty.

    Types

    type AdminClient

    type AdminClient struct {
    	// contains filtered or unexported fields
    }

      AdminClient provides admin operations for Pub/Sub Lite resources within a Google Cloud region. The zone component of resource paths must be within this region. See https://cloud.google.com/pubsub/lite/docs/locations for the list of zones where Pub/Sub Lite is available.

      An AdminClient may be shared by multiple goroutines.

      func NewAdminClient

      func NewAdminClient(ctx context.Context, region string, opts ...option.ClientOption) (*AdminClient, error)

        NewAdminClient creates a new Pub/Sub Lite client to perform admin operations for resources within a given region.

        func (*AdminClient) Close

        func (ac *AdminClient) Close() error

          Close releases any resources held by the client when it is no longer required. If the client is available for the lifetime of the program, then Close need not be called at exit.

          func (*AdminClient) CreateSubscription

          func (ac *AdminClient) CreateSubscription(ctx context.Context, config SubscriptionConfig, opts ...CreateSubscriptionOption) (*SubscriptionConfig, error)

            CreateSubscription creates a new subscription from the given config. If the subscription already exists an error will be returned.

            By default, a new subscription will only receive messages published after the subscription was created. Use StartingOffset to override.

            Example

              This example demonstrates how to create a new subscription for a topic. See https://cloud.google.com/pubsub/lite/docs/subscriptions for more information about how subscriptions are configured.

              Output:
              
              

              func (*AdminClient) CreateTopic

              func (ac *AdminClient) CreateTopic(ctx context.Context, config TopicConfig) (*TopicConfig, error)

                CreateTopic creates a new topic from the given config. If the topic already exists an error will be returned.

                Example

                  This example demonstrates how to create a new topic. See https://cloud.google.com/pubsub/lite/docs/topics for more information about how Pub/Sub Lite topics are configured. See https://cloud.google.com/pubsub/lite/docs/locations for the list of zones where Pub/Sub Lite is available.

                  Output:
                  
                  

                  func (*AdminClient) DeleteSubscription

                  func (ac *AdminClient) DeleteSubscription(ctx context.Context, subscription string) error

                    DeleteSubscription deletes a subscription. A valid subscription path has the format: "projects/PROJECT_ID/locations/ZONE/subscriptions/SUBSCRIPTION_ID".

                    Example
                    Output:
                    
                    

                    func (*AdminClient) DeleteTopic

                    func (ac *AdminClient) DeleteTopic(ctx context.Context, topic string) error

                      DeleteTopic deletes a topic. A valid topic path has the format: "projects/PROJECT_ID/locations/ZONE/topics/TOPIC_ID".

                      Example
                      Output:
                      
                      

                      func (*AdminClient) Subscription

                      func (ac *AdminClient) Subscription(ctx context.Context, subscription string) (*SubscriptionConfig, error)

                        Subscription retrieves the configuration of a subscription. A valid subscription name has the format: "projects/PROJECT_ID/locations/ZONE/subscriptions/SUBSCRIPTION_ID".

                        func (*AdminClient) Subscriptions

                        func (ac *AdminClient) Subscriptions(ctx context.Context, parent string) *SubscriptionIterator

                          Subscriptions retrieves the list of subscription configs for a given project and zone. A valid parent path has the format: "projects/PROJECT_ID/locations/ZONE".

                          Example
                          Output:
                          
                          

                          func (*AdminClient) Topic

                          func (ac *AdminClient) Topic(ctx context.Context, topic string) (*TopicConfig, error)

                            Topic retrieves the configuration of a topic. A valid topic path has the format: "projects/PROJECT_ID/locations/ZONE/topics/TOPIC_ID".

                            func (*AdminClient) TopicPartitionCount

                            func (ac *AdminClient) TopicPartitionCount(ctx context.Context, topic string) (int, error)

                              TopicPartitionCount returns the number of partitions for a topic. A valid topic path has the format: "projects/PROJECT_ID/locations/ZONE/topics/TOPIC_ID".

                              func (*AdminClient) TopicSubscriptions

                              func (ac *AdminClient) TopicSubscriptions(ctx context.Context, topic string) *SubscriptionPathIterator

                                TopicSubscriptions retrieves the list of subscription paths for a topic. A valid topic path has the format: "projects/PROJECT_ID/locations/ZONE/topics/TOPIC_ID".

                                Example
                                Output:
                                
                                

                                func (*AdminClient) Topics

                                func (ac *AdminClient) Topics(ctx context.Context, parent string) *TopicIterator

                                  Topics retrieves the list of topic configs for a given project and zone. A valid parent path has the format: "projects/PROJECT_ID/locations/ZONE".

                                  Example
                                  Output:
                                  
                                  

                                  func (*AdminClient) UpdateSubscription

                                  func (ac *AdminClient) UpdateSubscription(ctx context.Context, config SubscriptionConfigToUpdate) (*SubscriptionConfig, error)

                                    UpdateSubscription updates an existing subscription from the given config and returns the new subscription config. UpdateSubscription returns an error if no fields were modified.

                                    Example
                                    Output:
                                    
                                    

                                    func (*AdminClient) UpdateTopic

                                    func (ac *AdminClient) UpdateTopic(ctx context.Context, config TopicConfigToUpdate) (*TopicConfig, error)

                                      UpdateTopic updates an existing topic from the given config and returns the new topic config. UpdateTopic returns an error if no fields were modified.

                                      Example
                                      Output:
                                      
                                      

                                      type BacklogLocation

                                      type BacklogLocation int

                                        BacklogLocation refers to a location with respect to the message backlog.

                                        const (
                                        	// End refers to the location past all currently published messages. End
                                        	// skips the entire message backlog.
                                        	End BacklogLocation = iota + 1
                                        
                                        	// Beginning refers to the location of the oldest retained message.
                                        	Beginning
                                        )

                                        type CreateSubscriptionOption

                                        type CreateSubscriptionOption interface {
                                        	Apply(*createSubscriptionSettings)
                                        }

                                          CreateSubscriptionOption is an option for AdminClient.CreateSubscription.

                                          func StartingOffset

                                          func StartingOffset(location BacklogLocation) CreateSubscriptionOption

                                            StartingOffset specifies the offset at which a newly created subscription will start receiving messages.

                                            type DeliveryRequirement

                                            type DeliveryRequirement int

                                              DeliveryRequirement specifies when a subscription should send messages to subscribers relative to persistence in storage.

                                              const (
                                              	// UnspecifiedDeliveryRequirement represents an unset delivery requirement.
                                              	UnspecifiedDeliveryRequirement DeliveryRequirement = iota
                                              
                                              	// DeliverImmediately means the server will not wait for a published message
                                              	// to be successfully written to storage before delivering it to subscribers.
                                              	DeliverImmediately
                                              
                                              	// DeliverAfterStored means the server will not deliver a published message to
                                              	// subscribers until the message has been successfully written to storage.
                                              	// This will result in higher end-to-end latency, but consistent delivery.
                                              	DeliverAfterStored
                                              )

                                              type SubscriptionConfig

                                              type SubscriptionConfig struct {
                                              	// The full path of the subscription, in the format:
                                              	// "projects/PROJECT_ID/locations/ZONE/subscriptions/SUBSCRIPTION_ID".
                                              	//
                                              	// - PROJECT_ID: The project ID (e.g. "my-project") or the project number
                                              	//   (e.g. "987654321") can be provided.
                                              	// - ZONE: The Google Cloud zone (e.g. "us-central1-a") of the corresponding
                                              	//   topic.
                                              	// - SUBSCRIPTION_ID: The ID of the subscription (e.g. "my-subscription"). See
                                              	//   https://cloud.google.com/pubsub/docs/admin#resource_names for information
                                              	//   about valid subscription IDs.
                                              	Name string
                                              
                                              	// The path of the topic that this subscription is attached to, in the format:
                                              	// "projects/PROJECT_ID/locations/ZONE/topics/TOPIC_ID". This cannot be
                                              	// changed after creation.
                                              	Topic string
                                              
                                              	// Whether a message should be delivered to subscribers immediately after it
                                              	// has been published or after it has been successfully written to storage.
                                              	DeliveryRequirement DeliveryRequirement
                                              }

                                                SubscriptionConfig describes the properties of a Pub/Sub Lite subscription, which is attached to a Pub/Sub Lite topic. See https://cloud.google.com/pubsub/lite/docs/subscriptions for more information about how subscriptions are configured.

                                                type SubscriptionConfigToUpdate

                                                type SubscriptionConfigToUpdate struct {
                                                	// The full path of the subscription to update, in the format:
                                                	// "projects/PROJECT_ID/locations/ZONE/subscriptions/SUBSCRIPTION_ID".
                                                	// Required.
                                                	Name string
                                                
                                                	// If non-zero, updates the message delivery requirement.
                                                	DeliveryRequirement DeliveryRequirement
                                                }

                                                  SubscriptionConfigToUpdate specifies the properties to update for a subscription.

                                                  type SubscriptionIterator

                                                  type SubscriptionIterator struct {
                                                  	// contains filtered or unexported fields
                                                  }

                                                    SubscriptionIterator is an iterator that returns a list of subscription configs.

                                                    func (*SubscriptionIterator) Next

                                                      Next returns the next subscription config. The second return value will be iterator.Done if there are no more subscription configs.

                                                      type SubscriptionPathIterator

                                                      type SubscriptionPathIterator struct {
                                                      	// contains filtered or unexported fields
                                                      }

                                                        SubscriptionPathIterator is an iterator that returns a list of subscription paths.

                                                        func (*SubscriptionPathIterator) Next

                                                        func (sp *SubscriptionPathIterator) Next() (string, error)

                                                          Next returns the next subscription path, which has format: "projects/PROJECT_ID/locations/ZONE/subscriptions/SUBSCRIPTION_ID". The second return value will be iterator.Done if there are no more subscription paths.

                                                          type TopicConfig

                                                          type TopicConfig struct {
                                                          	// The full path of the topic, in the format:
                                                          	// "projects/PROJECT_ID/locations/ZONE/topics/TOPIC_ID".
                                                          	//
                                                          	// - PROJECT_ID: The project ID (e.g. "my-project") or the project number
                                                          	//   (e.g. "987654321") can be provided.
                                                          	// - ZONE: The Google Cloud zone (e.g. "us-central1-a") where the topic is
                                                          	//   located. See https://cloud.google.com/pubsub/lite/docs/locations for the
                                                          	//   list of zones where Pub/Sub Lite is available.
                                                          	// - TOPIC_ID: The ID of the topic (e.g. "my-topic"). See
                                                          	//   https://cloud.google.com/pubsub/docs/admin#resource_names for information
                                                          	//   about valid topic IDs.
                                                          	Name string
                                                          
                                                          	// The number of partitions in the topic. Must be at least 1. Can be increased
                                                          	// after creation, but not decreased.
                                                          	PartitionCount int
                                                          
                                                          	// Publish throughput capacity per partition in MiB/s.
                                                          	// Must be >= 4 and <= 16.
                                                          	PublishCapacityMiBPerSec int
                                                          
                                                          	// Subscribe throughput capacity per partition in MiB/s.
                                                          	// Must be >= 4 and <= 32.
                                                          	SubscribeCapacityMiBPerSec int
                                                          
                                                          	// The provisioned storage, in bytes, per partition. If the number of bytes
                                                          	// stored in any of the topic's partitions grows beyond this value, older
                                                          	// messages will be dropped to make room for newer ones, regardless of the
                                                          	// value of `RetentionDuration`. Must be >= 30 GiB.
                                                          	PerPartitionBytes int64
                                                          
                                                          	// How long a published message is retained. If set to `InfiniteRetention`,
                                                          	// messages will be retained as long as the bytes retained for each partition
                                                          	// is below `PerPartitionBytes`. Otherwise, must be > 0.
                                                          	RetentionDuration time.Duration
                                                          }

                                                            TopicConfig describes the properties of a Pub/Sub Lite topic. See https://cloud.google.com/pubsub/lite/docs/topics for more information about how topics are configured.

                                                            type TopicConfigToUpdate

                                                            type TopicConfigToUpdate struct {
                                                            	// The full path of the topic to update, in the format:
                                                            	// "projects/PROJECT_ID/locations/ZONE/topics/TOPIC_ID". Required.
                                                            	Name string
                                                            
                                                            	// If non-zero, will update the number of partitions in the topic.
                                                            	// Set value must be >= 1. The number of partitions can only be increased, not
                                                            	// decreased.
                                                            	PartitionCount int
                                                            
                                                            	// If non-zero, will update the publish throughput capacity per partition.
                                                            	// Set value must be >= 4 and <= 16.
                                                            	PublishCapacityMiBPerSec int
                                                            
                                                            	// If non-zero, will update the subscribe throughput capacity per partition.
                                                            	// Set value must be >= 4 and <= 32.
                                                            	SubscribeCapacityMiBPerSec int
                                                            
                                                            	// If non-zero, will update the provisioned storage per partition.
                                                            	// Set value must be >= 30 GiB.
                                                            	PerPartitionBytes int64
                                                            
                                                            	// If specified, will update how long a published message is retained. To
                                                            	// clear a retention duration (i.e. retain messages as long as there is
                                                            	// available storage), set this to `InfiniteRetention`.
                                                            	RetentionDuration optional.Duration
                                                            }

                                                              TopicConfigToUpdate specifies the properties to update for a topic.

                                                              type TopicIterator

                                                              type TopicIterator struct {
                                                              	// contains filtered or unexported fields
                                                              }

                                                                TopicIterator is an iterator that returns a list of topic configs.

                                                                func (*TopicIterator) Next

                                                                func (t *TopicIterator) Next() (*TopicConfig, error)

                                                                  Next returns the next topic config. The second return value will be iterator.Done if there are no more topic configs.

                                                                  Directories

                                                                  Path Synopsis
                                                                  Use of Context The ctx passed to NewClient is used for authentication requests and for creating the underlying connection, but is not used for subsequent calls.
                                                                  Use of Context The ctx passed to NewClient is used for authentication requests and for creating the underlying connection, but is not used for subsequent calls.
                                                                  Package pscompat contains clients for publishing and subscribing using the Pub/Sub Lite service.
                                                                  Package pscompat contains clients for publishing and subscribing using the Pub/Sub Lite service.
                                                                  internal