wire

package
v1.4.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 18, 2022 License: Apache-2.0 Imports: 34 Imported by: 0

README

Wire

This directory contains internal implementation details for Pub/Sub Lite.

Conventions

The following are general conventions used in this package:

  • Capitalized methods and fields of a struct denotes its public interface. They are safe to call from outside the struct (e.g. accesses immutable fields or guarded by a mutex). All other methods are considered internal implementation details that should not be called from outside the struct.
  • unsafeFoo() methods indicate that the caller is expected to have already acquired the struct's mutex. Since Go does not support re-entrant locks, they do not acquire the mutex. These are typically common util methods that need to be atomic with other operations.

Documentation

Index

Constants

View Source
const (
	// MaxPublishRequestCount is the maximum number of messages that can be
	// batched in a single publish request.
	MaxPublishRequestCount = 1000

	// MaxPublishRequestBytes is the maximum allowed serialized size of a single
	// publish request (containing a batch of messages) in bytes. Must be lower
	// than the gRPC limit of 4 MiB.
	MaxPublishRequestBytes int = 3.5 * 1024 * 1024
)

Variables

View Source
var (
	// ErrOverflow indicates that the publish buffers have overflowed. See
	// comments for PublishSettings.BufferedByteLimit.
	ErrOverflow = errors.New("pubsublite: client-side publish buffers have overflowed")

	// ErrOversizedMessage indicates that the user published a message over the
	// allowed serialized byte size limit. It is wrapped in another error.
	ErrOversizedMessage = fmt.Errorf("maximum allowed message size is MaxPublishRequestBytes (%d)", MaxPublishRequestBytes)

	// ErrServiceUninitialized indicates that a service (e.g. publisher or
	// subscriber) cannot perform an operation because it is uninitialized.
	ErrServiceUninitialized = errors.New("pubsublite: service must be started")

	// ErrServiceStarting indicates that a service (e.g. publisher or subscriber)
	// cannot perform an operation because it is starting up.
	ErrServiceStarting = errors.New("pubsublite: service is starting up")

	// ErrServiceStopped indicates that a service (e.g. publisher or subscriber)
	// cannot perform an operation because it has stopped or is in the process of
	// stopping.
	ErrServiceStopped = errors.New("pubsublite: service has stopped or is stopping")

	// ErrBackendUnavailable indicates that the backend service has been
	// unavailable for a period of time. The timeout can be configured using
	// PublishSettings.Timeout or ReceiveSettings.Timeout.
	ErrBackendUnavailable = errors.New("pubsublite: backend service is unavailable")
)

Errors exported from this package.

View Source
var DefaultPublishSettings = PublishSettings{
	DelayThreshold: 10 * time.Millisecond,
	CountThreshold: 100,
	ByteThreshold:  1e6,
	Timeout:        7 * 24 * time.Hour,

	BufferedByteLimit: 1 << 30,
	ConfigPollPeriod:  10 * time.Minute,
}

DefaultPublishSettings holds the default values for PublishSettings.

View Source
var DefaultReceiveSettings = ReceiveSettings{
	MaxOutstandingMessages: 1000,
	MaxOutstandingBytes:    1e9,
	Timeout:                7 * 24 * time.Hour,
}

DefaultReceiveSettings holds the default values for ReceiveSettings.

Functions

func LocationToRegion added in v1.2.0

func LocationToRegion(location string) (string, error)

LocationToRegion returns the region that the given location is in.

func NewAdminClient

func NewAdminClient(ctx context.Context, region string, opts ...option.ClientOption) (*vkit.AdminClient, error)

NewAdminClient creates a new gapic AdminClient for a region.

func ValidateRegion

func ValidateRegion(input string) error

ValidateRegion verifies that the `input` string has the format of a valid Google Cloud region. An example region is "europe-west1". See https://cloud.google.com/compute/docs/regions-zones for more information.

Types

type AckConsumer

type AckConsumer interface {
	Ack()
}

AckConsumer is the interface exported from this package for acking messages.

type FrameworkType added in v0.5.0

type FrameworkType string

FrameworkType is the user-facing API for Cloud Pub/Sub Lite.

const FrameworkCloudPubSubShim FrameworkType = "CLOUD_PUBSUB_SHIM"

FrameworkCloudPubSubShim is the API that emulates Cloud Pub/Sub.

type LocationPath added in v0.6.0

type LocationPath struct {
	// A Google Cloud project. The project ID (e.g. "my-project") or the project
	// number (e.g. "987654321") can be provided.
	Project string

	// A Google Cloud zone (e.g. "us-central1-a") or region (e.g. "us-central1").
	Location string
}

LocationPath stores a path consisting of a project and zone/region.

func ParseLocationPath added in v0.6.0

func ParseLocationPath(input string) (LocationPath, error)

ParseLocationPath parses a project/location path.

func (LocationPath) String added in v0.6.0

func (l LocationPath) String() string

type MessageMetadata added in v0.7.0

type MessageMetadata struct {
	// The topic partition the message was published to.
	Partition int

	// The offset the message was assigned.
	Offset int64
}

MessageMetadata holds properties of a message published to the Pub/Sub Lite service.

NOTE: This is duplicated in the pscompat package in order to generate nicer docs and should be kept consistent.

func (*MessageMetadata) String added in v0.7.0

func (m *MessageMetadata) String() string

type MessageReceiverFunc added in v0.4.0

type MessageReceiverFunc func(*ReceivedMessage)

MessageReceiverFunc receives a Pub/Sub message from a topic partition.

type PartitionSet added in v1.2.0

type PartitionSet map[int]struct{}

PartitionSet is a set of partition numbers.

func NewPartitionSet added in v1.2.0

func NewPartitionSet(partitions []int) PartitionSet

NewPartitionSet creates a partition set initialized from the given partition numbers.

func (PartitionSet) Contains added in v1.2.0

func (ps PartitionSet) Contains(partition int) bool

Contains returns true if this set contains the specified partition.

func (PartitionSet) Ints added in v1.2.0

func (ps PartitionSet) Ints() (partitions []int)

Ints returns the partitions contained in this set as an unsorted slice.

func (PartitionSet) SortedInts added in v1.2.0

func (ps PartitionSet) SortedInts() (partitions []int)

SortedInts returns the partitions contained in this set as a sorted slice.

type PublishResultFunc added in v0.4.0

type PublishResultFunc func(*MessageMetadata, error)

PublishResultFunc receives the result of a publish.

type PublishSettings

type PublishSettings struct {
	// Publish a non-empty batch after this delay has passed. Must be > 0.
	DelayThreshold time.Duration

	// Publish a batch when it has this many messages. Must be > 0. The maximum is
	// MaxPublishRequestCount.
	CountThreshold int

	// Publish a batch when its size in bytes reaches this value. Must be > 0. The
	// maximum is MaxPublishRequestBytes.
	ByteThreshold int

	// The maximum time that the client will attempt to establish a publish stream
	// connection to the server. Must be > 0.
	//
	// The timeout is exceeded, the publisher will terminate with the last error
	// that occurred while trying to reconnect. Note that if the timeout duration
	// is long, ErrOverflow may occur first.
	Timeout time.Duration

	// The maximum number of bytes that the publisher will keep in memory before
	// returning ErrOverflow. Must be > 0.
	//
	// Note that Pub/Sub Lite topics are provisioned a publishing throughput
	// capacity, per partition, shared by all publisher clients. Setting a large
	// buffer size can mitigate transient publish spikes. However, consistently
	// attempting to publish messages at a much higher rate than the publishing
	// throughput capacity can cause the buffers to overflow. For more
	// information, see https://cloud.google.com/pubsub/lite/docs/topics.
	BufferedByteLimit int

	// The polling interval to watch for topic partition count updates. Set to 0
	// to disable polling if the number of partitions will never update.
	ConfigPollPeriod time.Duration

	// The user-facing API type.
	Framework FrameworkType
}

PublishSettings control the batching of published messages. These settings apply per partition.

type Publisher added in v0.4.0

type Publisher interface {
	Publish(*pb.PubSubMessage, PublishResultFunc)

	Start()
	WaitStarted() error
	Stop()
	WaitStopped() error
	Error() error
}

Publisher is the client interface exported from this package for publishing messages.

func NewPublisher added in v0.4.0

func NewPublisher(ctx context.Context, settings PublishSettings, region, topicPath string, opts ...option.ClientOption) (Publisher, error)

NewPublisher creates a new client for publishing messages.

type ReassignmentHandlerFunc added in v1.2.0

type ReassignmentHandlerFunc func(before, after PartitionSet) error

ReassignmentHandlerFunc receives a partition assignment change.

type ReceiveSettings added in v0.4.0

type ReceiveSettings struct {
	// MaxOutstandingMessages is the maximum number of unacknowledged messages.
	// Must be > 0.
	MaxOutstandingMessages int

	// MaxOutstandingBytes is the maximum size (in quota bytes) of unacknowledged
	// messages. Must be > 0.
	MaxOutstandingBytes int

	// The maximum time that the client will attempt to establish a subscribe
	// stream connection to the server. Must be > 0.
	//
	// The timeout is exceeded, the subscriber will terminate with the last error
	// that occurred while trying to reconnect.
	Timeout time.Duration

	// The topic partition numbers (zero-indexed) to receive messages from.
	// Values must be less than the number of partitions for the topic. If not
	// specified, the client will use the partition assignment service to
	// determine which partitions it should connect to.
	Partitions []int

	// The user-facing API type.
	Framework FrameworkType
}

ReceiveSettings control the receiving of messages. These settings apply per partition.

type ReceivedMessage added in v0.4.0

type ReceivedMessage struct {
	Msg       *pb.SequencedMessage
	Ack       AckConsumer
	Partition int
}

ReceivedMessage stores a received Pub/Sub message and AckConsumer for acknowledging the message.

type ReservationPath added in v1.0.0

type ReservationPath struct {
	// A Google Cloud project. The project ID (e.g. "my-project") or the project
	// number (e.g. "987654321") can be provided.
	Project string

	// A Google Cloud region. An example region is "us-central1".
	Region string

	// The ID of the Pub/Sub Lite reservation, for example "my-reservation-name".
	ReservationID string
}

ReservationPath stores the full path of a Pub/Sub Lite reservation.

func ParseReservationPath added in v1.0.0

func ParseReservationPath(input string) (ReservationPath, error)

ParseReservationPath parses the full path of a Pub/Sub Lite reservation.

func (ReservationPath) Location added in v1.0.0

func (r ReservationPath) Location() LocationPath

Location returns the reservation's location path.

func (ReservationPath) String added in v1.0.0

func (r ReservationPath) String() string

type Subscriber added in v0.4.0

type Subscriber interface {
	Start()
	WaitStarted() error
	Stop()
	WaitStopped() error
	Terminate()
	PartitionActive(int) bool
}

Subscriber is the client interface exported from this package for receiving messages.

func NewSubscriber added in v0.4.0

func NewSubscriber(ctx context.Context, settings ReceiveSettings, receiver MessageReceiverFunc, reassignmentHandler ReassignmentHandlerFunc,
	region, subscriptionPath string, opts ...option.ClientOption) (Subscriber, error)

NewSubscriber creates a new client for receiving messages.

type SubscriptionPath added in v0.6.0

type SubscriptionPath struct {
	// A Google Cloud project. The project ID (e.g. "my-project") or the project
	// number (e.g. "987654321") can be provided.
	Project string

	// A Google Cloud zone (e.g. "us-central1-a") or region (e.g. "us-central1").
	Location string

	// The ID of the Pub/Sub Lite subscription, for example
	// "my-subscription-name".
	SubscriptionID string
}

SubscriptionPath stores the full path of a Pub/Sub Lite subscription.

func ParseSubscriptionPath added in v0.6.0

func ParseSubscriptionPath(input string) (SubscriptionPath, error)

ParseSubscriptionPath parses the full path of a Pub/Sub Lite subscription.

func (SubscriptionPath) LocationPath added in v1.2.0

func (s SubscriptionPath) LocationPath() LocationPath

LocationPath returns the subscription's location path.

func (SubscriptionPath) String added in v0.6.0

func (s SubscriptionPath) String() string

type TopicPath added in v0.6.0

type TopicPath struct {
	// A Google Cloud project. The project ID (e.g. "my-project") or the project
	// number (e.g. "987654321") can be provided.
	Project string

	// A Google Cloud zone (e.g. "us-central1-a") or region (e.g. "us-central1").
	Location string

	// The ID of the Pub/Sub Lite topic, for example "my-topic-name".
	TopicID string
}

TopicPath stores the full path of a Pub/Sub Lite topic.

func ParseTopicPath added in v0.6.0

func ParseTopicPath(input string) (TopicPath, error)

ParseTopicPath parses the full path of a Pub/Sub Lite topic.

func (TopicPath) LocationPath added in v1.2.0

func (t TopicPath) LocationPath() LocationPath

LocationPath returns the topic's location path.

func (TopicPath) String added in v0.6.0

func (t TopicPath) String() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL