gcppubsub

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 21, 2026 License: Apache-2.0 Imports: 18 Imported by: 0

Documentation

Overview

Package gcppubsub implements the nimbus invalidation bus over Google Cloud Pub/Sub (cloud.google.com/go/pubsub/v2).

Each instance publishes evictions to a shared topic and subscribes with its own auto-expiring subscription, so a broadcast fans out to every instance. The subscription is deleted when Subscribe returns (e.g. on Close, which a Cloud Run service should wire to SIGTERM); the expiration policy is the backstop for instances that are hard-killed.

This is the pull-based bus. For request-only-CPU Cloud Run services, see PushHandler, which lets a push subscription deliver invalidations inside a request (so CPU is allocated). Either way, L2 remains the source of truth: an instance that misses a broadcast still converges on its next L2 read.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func PushHandler

func PushHandler(handler func(invalidation.Event)) http.Handler

PushHandler returns an http.Handler that decodes Pub/Sub push deliveries and calls handler with the invalidation event. Mount it on your Cloud Run service so a push subscription can deliver invalidations inside a request, which is throttle-safe under request-only CPU allocation (unlike a streaming pull).

This handler does NOT verify the request itself: it relies on a network-level guard (the Cloud Run run.invoker IAM binding for the push service account). For in-process defense-in-depth, construct a PushBus with WithPushAuth, whose Handler() verifies the Pub/Sub OIDC token before dispatching; that path is recommended for production. PushHandler stays available unauthenticated for advanced users who terminate auth elsewhere.

It always returns 204 for a well-formed envelope so Pub/Sub does not redeliver, including for undecodable payloads.

Types

type Bus

type Bus struct {
	// contains filtered or unexported fields
}

Bus is a Pub/Sub-backed invalidation bus. The *pubsub.Client is owned by the caller and is not closed by Bus.Close.

func New

func New(ctx context.Context, client *pubsub.Client, topicID string, opts ...Option) (*Bus, error)

New ensures the topic exists and returns a bus that publishes to it. topicID is the short topic name (not the full resource path).

func (*Bus) Close

func (b *Bus) Close() error

Close is a no-op; the caller owns the *pubsub.Client.

func (*Bus) Publish

func (b *Bus) Publish(ctx context.Context, ev invalidation.Event) error

Publish broadcasts an invalidation event to the topic.

func (*Bus) Subscribe

func (b *Bus) Subscribe(ctx context.Context, handler func(invalidation.Event)) error

Subscribe creates this instance's subscription and delivers events to handler until ctx is cancelled, then deletes the subscription.

type Option

type Option func(*config)

Option configures the bus.

func WithAckDeadline

func WithAckDeadline(d time.Duration) Option

WithAckDeadline sets the subscription ack deadline (default 10s).

func WithSubscriptionID

func WithSubscriptionID(id string) Option

WithSubscriptionID sets a fixed per-instance subscription ID instead of a random one. Each instance must use a distinct ID for broadcast fan-out.

func WithSubscriptionTTL

func WithSubscriptionTTL(d time.Duration) Option

WithSubscriptionTTL sets the subscription expiration policy, the backstop that reclaims subscriptions from instances that never ran teardown. Real Pub/Sub enforces a 1 day minimum; 0 disables the policy (default 24h).

type PushBus

type PushBus struct {
	// contains filtered or unexported fields
}

PushBus is the push-delivery variant: it publishes to the topic like Bus, but receives invalidations via an HTTP push subscription instead of a streaming pull. Mount Handler() on your service. This is throttle-safe under Cloud Run request-only CPU allocation, because the inbound push request allocates CPU (a streaming pull would stall between requests). Delivery is load-balanced across instances, so an instance that does not receive a given push converges on its next L2 read.

func NewPush

func NewPush(ctx context.Context, client *pubsub.Client, topicID string, opts ...PushOption) (*PushBus, error)

NewPush ensures the topic exists and returns a push bus.

func (*PushBus) Close

func (p *PushBus) Close() error

Close is a no-op; the caller owns the *pubsub.Client.

func (*PushBus) Handler

func (p *PushBus) Handler() http.Handler

Handler returns the http.Handler to mount for the push subscription endpoint. If the bus was built with WithPushAuth, the handler verifies the Pub/Sub OIDC token (audience + service-account allowlist) before dispatching; otherwise it is unauthenticated and relies on the Cloud Run run.invoker IAM binding alone (see examples/cloudrun).

func (*PushBus) Publish

func (p *PushBus) Publish(ctx context.Context, ev invalidation.Event) error

Publish broadcasts an invalidation event to the topic.

func (*PushBus) Subscribe

func (p *PushBus) Subscribe(ctx context.Context, handler func(invalidation.Event)) error

Subscribe registers handler; push deliveries to Handler invoke it. It blocks until ctx is cancelled, matching the Subscriber contract the cache expects.

type PushOption

type PushOption func(*PushBus)

PushOption configures a PushBus.

func WithPushAuth

func WithPushAuth(audience string, allowedServiceAccounts ...string) PushOption

WithPushAuth enables in-process verification of the Pub/Sub OIDC token on the handler returned by Handler(). It is opt-in defense-in-depth that complements (does not replace) the Cloud Run run.invoker IAM binding on the push service account.

audience is the expected "aud" claim — it must equal the audience configured on the subscription's oidc_token. That can be the push endpoint URL (Pub/Sub's default when audience is omitted) or any stable string set explicitly (the examples/cloudrun Terraform uses a fixed value to avoid a self-reference cycle on the auto-generated service URL). An empty audience here disables audience binding — a token for any audience minted by an allowlisted service account is accepted (signature, issuer, and verified email are still enforced); a warning is logged at construction. allowedServiceAccounts is the allowlist of service-account emails (the JWT "email" claim) permitted to push; supply the push subscription's service account.

With this option, Handler() returns 401 for a missing/malformed Authorization header or a token that fails signature, issuer, or verified-email validation, 403 for an audience mismatch or an email not in the allowlist, and 204 only after verification passes. Without it, Handler() is unauthenticated (see PushHandler).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL