Documentation ¶
Overview ¶
Package pubsublite abstracts the production and consumption of records to and from GCP PubSub Lite.
Index ¶
- func JoinTopicConsumer(topic apmqueue.Topic, consumer string) string
- func SplitTopicConsumer(subscriptionName string) (topic apmqueue.Topic, consumer string, err error)
- type CommonConfig
- type Consumer
- type ConsumerConfig
- type Manager
- func (m *Manager) Close() error
- func (m *Manager) CreateReservation(ctx context.Context, name string, throughputCapacity int) error
- func (m *Manager) CreateSubscription(ctx context.Context, name, topic string, deliverImmediately bool) error
- func (m *Manager) DeleteReservation(ctx context.Context, reservation string) error
- func (m *Manager) DeleteSubscription(ctx context.Context, subscription string) error
- func (m *Manager) DeleteTopic(ctx context.Context, topic string) error
- func (m *Manager) ListReservationTopics(ctx context.Context, reservation string) ([]string, error)
- func (m *Manager) ListReservations(ctx context.Context) ([]string, error)
- func (m *Manager) ListTopicSubscriptions(ctx context.Context, topic string) ([]string, error)
- func (m *Manager) MonitorConsumerLag(topicConsumers []apmqueue.TopicConsumer) (metric.Registration, error)
- func (m *Manager) NewTopicCreator(cfg TopicCreatorConfig) (*TopicCreator, error)
- type ManagerConfig
- type Producer
- type ProducerConfig
- type TopicCreator
- type TopicCreatorConfig
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func JoinTopicConsumer ¶
JoinTopicConsumer returns a Pub/Sub Lite subscription name for the given topic and consumer name.
func SplitTopicConsumer ¶
SplitTopicConsumer does the opposite of JoinTopicConsumer by parsing topic and consumer out of a subscription name. Returns an error if subscription name is not in an expected format.
Types ¶
type CommonConfig ¶
type CommonConfig struct { // Project is the GCP project. // // NewManager, NewProducer, and NewConsumer will set Project from // $GOOGLE_APPLICATION_CREDENTIALS it is not explicitly specified. Project string // Region is the GCP region. Region string // Namespace holds a namespace for Pub/Sub Lite resources. // // This is added as a prefix for reservation, topic, and // subscription names, and acts as a filter on resources // monitored or described by the manager. // // Namespace is always removed from resource names before // they are returned to callers. The only way Namespace // will surface is in telemetry (e.g. metrics), as an // independent dimension. This enables users to filter // metrics by namespace, while maintaining stable names // for resources (e.g. topics). Namespace string // ClientOptions holds arbitrary Google API client options. ClientOptions []option.ClientOption // Logger to use for any errors. Logger *zap.Logger // DisableTelemetry disables the OpenTelemetry hook. DisableTelemetry bool // TracerProvider allows specifying a custom otel tracer provider. // Defaults to the global one. TracerProvider trace.TracerProvider // MeterProvider allows specifying a custom otel meter provider. // Defaults to the global one. MeterProvider metric.MeterProvider }
CommonConfig defines common configuration for Pub/Sub Lite consumers, producers, and managers.
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer receives PubSub Lite messages from a existing subscription(s). The underlying library processes messages concurrently per subscription and partition.
func NewConsumer ¶
func NewConsumer(ctx context.Context, cfg ConsumerConfig) (*Consumer, error)
NewConsumer creates a new consumer instance for a single subscription.
func (*Consumer) Close ¶
Close closes the consumer. Once the consumer is closed, it can't be re-used.
type ConsumerConfig ¶
type ConsumerConfig struct { CommonConfig // Topics holds the list of topics from which to consume. Topics []apmqueue.Topic // ConsumerName holds the consumer name. This will be combined with // the topic names to identify Pub/Sub Lite subscriptions, and must // be unique per consuming service. ConsumerName string // Processor that will be used to process each event individually. // Processor may be called from multiple goroutines and needs to be // safe for concurrent use. Processor apmqueue.Processor // Delivery mechanism to use to acknowledge the messages. // AtMostOnceDeliveryType and AtLeastOnceDeliveryType are supported. Delivery apmqueue.DeliveryType }
ConsumerConfig defines the configuration for the PubSub Lite consumer.
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
Manager manages GCP Pub/Sub topics.
func NewManager ¶
func NewManager(cfg ManagerConfig) (*Manager, error)
NewManager returns a new Manager with the given config.
func (*Manager) CreateReservation ¶
CreateReservation creates a reservation with the given name and throughput capacity.
Reservations that already exist will be left unmodified.
func (*Manager) CreateSubscription ¶
func (m *Manager) CreateSubscription( ctx context.Context, name, topic string, deliverImmediately bool, ) error
CreateSubscription creates a reservation with the given name and throughput capacity.
Subscriptions that already exist will be left unmodified.
func (*Manager) DeleteReservation ¶
DeleteReservation deletes the given reservation.
No error is returned if the reservation does not exist.
func (*Manager) DeleteSubscription ¶
DeleteSubscription deletes the given subscription.
No error is returned if the subscription does not exist.
func (*Manager) DeleteTopic ¶
DeleteTopic deletes the given topic.
No error is returned if the topic does not exist.
func (*Manager) ListReservationTopics ¶
ListReservationTopics lists topics in the given reservation.
func (*Manager) ListReservations ¶
ListReservations lists reservations in the configured project and region.
func (*Manager) ListTopicSubscriptions ¶
ListTopicSubscriptions lists subscriptions for the given topic.
func (*Manager) MonitorConsumerLag ¶
func (m *Manager) MonitorConsumerLag(topicConsumers []apmqueue.TopicConsumer) (metric.Registration, error)
MonitorConsumerLag registers a callback with OpenTelemetry to measure consumer group lag for the given topics.
func (*Manager) NewTopicCreator ¶
func (m *Manager) NewTopicCreator(cfg TopicCreatorConfig) (*TopicCreator, error)
NewTopicCreator returns a new TopicCreator with the given config.
type ManagerConfig ¶
type ManagerConfig struct {
CommonConfig
}
ManagerConfig holds configuration for managing GCP Pub/Sub Lite resources.
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
Producer implementes the apmqueue.Producer interface and sends each of the events in a batch to a PubSub Lite topic, which is determined by calling the configured TopicRouter.
func NewProducer ¶
func NewProducer(cfg ProducerConfig) (*Producer, error)
NewProducer creates a new PubSub Lite producer for a single project.
func (*Producer) Close ¶
Close stops the producer.
This call is blocking and will cause all the underlying clients to stop producing. If producing is asynchronous, it'll block until all messages have been produced. After Close() is called, Producer cannot be reused.
func (*Producer) Healthy ¶
Healthy is a noop at the moment. TODO(marclop) range over the producers, call .Error(), if any returns an error, return it.
func (*Producer) Produce ¶
Produce produces N records. If the Producer is synchronous, waits until all records are produced, otherwise, returns as soon as the records are stored in the producer buffer, or when the records are produced to the queue if sync producing is configured. If the context has been enriched with metadata, each entry will be added as a record's header. Produce takes ownership of Record and any modifications after Produce is called may cause an unhandled exception.
type ProducerConfig ¶
type ProducerConfig struct { CommonConfig // Sync can be used to indicate whether production should be synchronous. // Due to the mechanics of PubSub Lite publishing, producing synchronously // will yield poor performance the unless a single call to produce contains // enough records that are large enough to cause immediate flush. Sync bool }
ProducerConfig for the PubSub Lite producer.
type TopicCreator ¶
type TopicCreator struct {
// contains filtered or unexported fields
}
TopicCreator creates GCP Pub/Sub topics.
func (*TopicCreator) CreateTopics ¶
CreateTopics creates one or more topics.
Topics that already exist will be left unmodified.
type TopicCreatorConfig ¶
type TopicCreatorConfig struct { // Reservation holds the unqualified ID of the reservation with // which topics will be associated. This will be combined with the // project ID, region ID, and namespace to form the reservation path. Reservation string // PartitionCount is the number of partitions to assign to newly // created topics. // // Must be greater than zero. PartitionCount int // PublishCapacityMiBPerSec defines the publish throughput capacity // per partition in MiB/s. Must be >= 4 and <= 16. PublishCapacityMiBPerSec int // SubscribeCapacityMiBPerSec defines the subscribe throughput capacity // per partition in MiB/s. Must be >= 4 and <= 32. SubscribeCapacityMiBPerSec int // PerPartitionBytes holds the provisioned storage, in bytes, per partition. // // If the number of bytes stored in any of the topic's partitions grows beyond // this value, older messages will be dropped to make room for newer ones, // regardless of the value of `RetentionDuration`. Must be >= 30 GiB. PerPartitionBytes int64 // RetentionDuration indicates how long messages are retained. Must be > 0. RetentionDuration time.Duration }
TopicCreatorConfig holds configuration for managing GCP Pub/Sub Lite topics.
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
internal
|
|
pubsubabs
Package pubsubabs provides an abstraction layer over the `pubsub` PublisherClient types to allow testing.
|
Package pubsubabs provides an abstraction layer over the `pubsub` PublisherClient types to allow testing. |
telemetry
Package telemetry allows setting up telemetry for pubsublite consumers and producers
|
Package telemetry allows setting up telemetry for pubsublite consumers and producers |