gcs

package
v0.0.0-...-62ff037 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 6, 2026 License: Apache-2.0 Imports: 19 Imported by: 0

README

GCS Workqueue Implementation

This package implements a Google Cloud Storage (GCS) backed workqueue that provides reliable, persistent task processing with state management.

Bucket Organization

The GCS workqueue uses object prefixes to organize tasks by their state within a single bucket:

Prefixes
  • queued/ - Tasks waiting to be processed
  • in-progress/ - Tasks currently being processed by a worker
  • dead-letter/ - Tasks that have failed after exceeding maximum retry attempts
State Transitions
queued/{key} → in-progress/{key} → [completed] (deleted)
                      ↓
                 dead-letter/{key} (on failure)
                      ↑
                 queued/{key} (on requeue)

Object Metadata

Each object stores metadata to track task state:

  • priority - Zero-padded 8-digit priority for lexicographic ordering (higher = processed first)
  • attempts - Number of processing attempts
  • lease-expiration - When the current lease expires (for in-progress tasks)
  • not-before - Earliest time the task should be processed (RFC3339 format)
  • failed-time - When the task was moved to dead letter queue (RFC3339 format)
  • last-attempted - Unix timestamp of last processing attempt

Key Features

  • Priority-based processing - Higher priority tasks processed first
  • Lease-based ownership - In-progress tasks have renewable leases to prevent multiple workers processing the same task
  • Automatic retry with backoff - Failed tasks automatically requeued with exponential backoff
  • Dead letter handling - Tasks exceeding retry limits moved to dead letter queue
  • Orphan detection - Detects and handles tasks with expired leases
  • Deduplication - Duplicate queue requests update priority/timing instead of creating duplicates

Metrics

The implementation exports Prometheus metrics for:

  • Queue sizes (queued, in-progress, dead-lettered)
  • Processing latency and wait times
  • Retry attempts and completion rates
  • Task priorities and attempt distributions

Documentation

Index

Constants

This section is empty.

Variables

View Source
var RefreshInterval = 5 * time.Minute

RefreshInterval is the period on which we refresh the lease of owned objects It is surfaced as a global, so that it can be mutated by tests and exposed as a flag by binaries wrapping this library. However, binary authors should use caution to pass consistent values to the key ingress and dispatchers, or they may see unexpected behavior. TODO(mattmoor): What's the right balance here?

View Source
var TrackWorkAttemptMinThreshold = 20

The minimum number of attempts before tracking work attempts. This is to minimize the cardinality of the metric.

Functions

func NewWorkQueue

func NewWorkQueue(client ClientInterface, limit int) workqueue.Interface

NewWorkQueue creates a new GCS-backed workqueue.

Types

type ClientInterface

type ClientInterface interface {
	Object(name string) *storage.ObjectHandle
	Objects(ctx context.Context, q *storage.Query) *storage.ObjectIterator
}

ClientInterface is an interface that abstracts the GCS client.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL