storage

package
v0.26.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 6, 2026 License: Apache-2.0 Imports: 28 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultCachePageSize       = 256 * 1024       // 256 KiB
	DefaultObjectInfoCacheSize = 512 * 1024       // 512 KiB
	DefaultObjectPageCacheSize = 64 * 1024 * 1024 // 64 MiB
	DefaultObjectCacheSize     = 64 * 1024 * 1024 // 64 MiB
	DefaultCacheTTL            = time.Minute
)
View Source
const (
	ContentTypeJSON       = "application/json"
	ContentTypeAvro       = "application/avro"
	ContentTypeParquet    = "application/vnd.apache.parquet"
	CacheControlImmutable = "public, max-age=31536000, immutable"
)

Variables

View Source
var (
	ErrBucketExist         = errors.New("bucket exist")
	ErrBucketNotFound      = errors.New("bucket not found")
	ErrBucketReadOnly      = errors.New("read-only bucket")
	ErrChecksumMismatch    = makeExpected(errors.New("checksum mismatch"))
	ErrObjectNotFound      = makeExpected(errors.New("object not found"))
	ErrObjectNotMatch      = makeTemporary(errors.New("object mismatch"))
	ErrInvalidObjectKey    = errors.New("invalid object key")
	ErrInvalidObjectTag    = errors.New("invalid object tag")
	ErrInvalidRange        = errors.New("offset out of range")
	ErrPresignNotSupported = errors.New("presigned URLs not supported")
	ErrPresignRedirect     = makeExpected(errors.New("redirect to presigned URL"))
	ErrTooManyRequests     = makeTemporary(errors.New("too many requests"))
)

Functions

func CopyObject

func CopyObject(ctx context.Context, sourceURI, destURI string, options ...PutOption) error

func CopyObjectAt

func CopyObjectAt(ctx context.Context, registry Registry, sourceURI, destURI string, options ...PutOption) error

func DeleteObject

func DeleteObject(ctx context.Context, objectURI string) error

func DeleteObjectAt

func DeleteObjectAt(ctx context.Context, registry Registry, objectURI string) error

func DeleteObjects

func DeleteObjects(ctx context.Context, objects iter.Seq2[string, error]) iter.Seq2[string, error]

func DeleteObjectsAt

func DeleteObjectsAt(ctx context.Context, registry Registry, objects iter.Seq2[string, error]) iter.Seq2[string, error]

func FS

func FS(ctx context.Context, reg Registry) fs.FS

FS constructs a fs.FS using the given registry to load objects from buckets based on their URI location.

The context is used for all operations on the registry and buckets that it loads, cancelling it will abort all inflight I/O on the filesystem.

func Install

func Install(adapters ...Adapter)

func ListObjects

func ListObjects(ctx context.Context, prefixURI string, options ...ListOption) iter.Seq2[Object, error]

func ListObjectsAt

func ListObjectsAt(ctx context.Context, registry Registry, prefixURI string, options ...ListOption) iter.Seq2[Object, error]

func Location

func Location(location, path string) string

func NewOptions

func NewOptions[Options any, Option ~func(*Options)](options iter.Seq[Option]) *Options

func PutObjectAtWriter

func PutObjectAtWriter(ctx context.Context, registry Registry, objectURI string, options ...PutOption) io.WriteCloser

PutObjectAtWriter wraps a storage.PutObjectAt call in a WriteCloser. Errors returned by PutObjectAt are passed through the WriteCloser methods.

func PutObjectWriter

func PutObjectWriter(ctx context.Context, objectURI string, options ...PutOption) io.WriteCloser

func Register

func Register(typ string, reg Registry)

func ValidObjectKey

func ValidObjectKey(key string) error

ValidObjectKey reports whether key is a legal object key. The rules mirror io/fs.ValidPath (UTF-8, no leading slash, no empty / "." / ".." segments) with one deliberate relaxation: a single trailing "/" is allowed, used by object stores (S3, GCS, ...) as a directory-marker convention and by the fuse package to persist per-directory permissions.

The success path is zero-allocation; allocations only happen when building the wrapped error on rejection. See BenchmarkValidObjectKey.

func ValidObjectRange

func ValidObjectRange(key string, start, end int64) error

ValidObjectRange validates a byte range as passed to BytesRange. The range is inclusive on both ends. An end of -1 means "to the end of the object"; other negative end values are invalid. A negative start is always invalid, as is a non-negative end that precedes the start.

func WatchObjects

func WatchObjects(ctx context.Context, bucket Bucket, options ...ListOption) iter.Seq2[Object, error]

WatchObjects provides a generic implementation of watching for changes to objects in a bucket using ListObjects with exponential backoff. It tracks object state and yields objects that have been added, modified, or deleted.

Objects that have been deleted are yielded with a Size of -1 as a deletion marker. The function uses exponential backoff to poll for changes, resetting the backoff delay whenever changes are detected.

This implementation is similar to storage/http.Bucket.WatchObjects and can be used by bucket implementations that don't have native watch capabilities.

Example usage:

for object, err := range storage.WatchObjects(ctx, bucket, storage.KeyPrefix("logs/")) {
	if err != nil {
		log.Printf("watch error: %v", err)
		continue
	}
	if object.Size == -1 {
		log.Printf("deleted: %s", object.Key)
	} else {
		log.Printf("added/updated: %s (size: %d)", object.Key, object.Size)
	}
}

Types

type Adapter

type Adapter interface {
	AdaptBucket(Bucket) Bucket
}

func WithInstrumentation

func WithInstrumentation() Adapter

WithInstrumentation returns an adapter that wraps buckets with OpenTelemetry tracing. Each storage operation is recorded as a span with relevant attributes.

func WithLogger

func WithLogger(logger *slog.Logger) Adapter

WithLogger returns an adapter that wraps buckets with structured logging of all operations using the provided slog.Logger.

func WithMount

func WithMount(prefix string, bucket Bucket) Adapter

WithMount returns an adapter that mounts a bucket at a specific prefix. When the mount prefix matches, operations are delegated to the mounted bucket. Location, Create, and Access operations are unchanged and passed to the underlying bucket.

func WithOverlay

func WithOverlay(readLayer Bucket) Adapter

WithOverlay returns an adapter that overlays a read layer under the write layer. Read operations fall back to the read layer when objects are not found in the write layer. Write operations go to the write layer only.

func WithPrefix

func WithPrefix(prefix string) Adapter

func WithReadOnly

func WithReadOnly() Adapter

WithReadOnly returns an adapter that wraps buckets to reject all write operations with ErrBucketReadOnly.

func WithScheme

func WithScheme(scheme string) Adapter

type AdapterFunc

type AdapterFunc func(Bucket) Bucket

func (AdapterFunc) AdaptBucket

func (a AdapterFunc) AdaptBucket(b Bucket) Bucket

type Bucket

type Bucket interface {
	// Location returns a URI for the bucket. It always includes a
	// scheme component, and may include a path component.
	//
	// Some example location values:
	//
	//   s3://some-bucket
	//   gcs://another-one/with-prefix
	//
	// As a special exception, the "memory" bucket implementation
	// does _not_ contain a scheme prefix, and instead has the
	// special hostname of ":memory:", for historical reasons.
	Location() string

	// Access verifies that the bucket is accessible. It returns
	// nil error only if the bucket can be reached. This can be
	// used to test bucket existence and authentication.
	Access(ctx context.Context) error

	// Create instantiates a new bucket at Location().
	Create(ctx context.Context) error

	// HeadObject retrieves metadata about the object stored at
	// key.
	HeadObject(ctx context.Context, key string) (ObjectInfo, error)

	// GetObject retrieves the contents of the object stored at
	// key, as well as its metadata.
	GetObject(ctx context.Context, key string, options ...GetOption) (io.ReadCloser, ObjectInfo, error)

	// PutObject stores bytes at key.
	PutObject(ctx context.Context, key string, value io.Reader, options ...PutOption) (ObjectInfo, error)

	// DeleteObject removes whatever is found at key. It returns
	// an error if there is nothing there.
	DeleteObject(ctx context.Context, key string) error

	// DeleteObjects deletes multiple objects. It consumes the input sequence
	// of object keys and yields results for each deletion. The output sequence
	// yields (key, nil) for successful deletions and (key, error) for failures.
	// Input errors are propagated immediately. The stream must be consumed to
	// drive the deletion process.
	DeleteObjects(ctx context.Context, objects iter.Seq2[string, error]) iter.Seq2[string, error]

	// CopyObject copies an object from one key to another within the same bucket.
	// Source object metadata is preserved by default; any PutOptions provided will
	// override specific fields (merge semantics).
	CopyObject(ctx context.Context, from, to string, options ...PutOption) error

	// ListObjects gathers a list of abbreviated metadata for all
	// objects in a bucket, or under a key prefix (set through a
	// ListOption).
	ListObjects(ctx context.Context, options ...ListOption) iter.Seq2[Object, error]

	// WatchObjects is list ListObjects but the sequence doesn't end.
	// After listing the objects, it watches for any changes to the
	// prefix and yields new objects as they are added, modified, or
	// removed. The removal of objects is indicated by yielding an
	// Object with a negative Size.
	WatchObjects(ctx context.Context, options ...ListOption) iter.Seq2[Object, error]

	// PresignGetObject generates a presigned URL for getting an object.
	// The expiration parameter specifies how long the presigned URL will remain valid.
	PresignGetObject(ctx context.Context, key string, expiration time.Duration, options ...GetOption) (string, error)

	// PresignPutObject generates a presigned URL for putting an object.
	// The expiration parameter specifies how long the presigned URL will remain valid.
	PresignPutObject(ctx context.Context, key string, expiration time.Duration, options ...PutOption) (string, error)

	// PresignHeadObject generates a presigned URL for getting object metadata.
	// The expiration parameter specifies how long the presigned URL will remain valid.
	PresignHeadObject(ctx context.Context, key string, expiration time.Duration) (string, error)

	// PresignDeleteObject generates a presigned URL for deleting an object.
	// The expiration parameter specifies how long the presigned URL will remain valid.
	PresignDeleteObject(ctx context.Context, key string, expiration time.Duration) (string, error)
}

Bucket is an interface describing an object storage bucket. It is modeled off of the S3 object storage API. While it has many implementations, it uses S3's interface as the common denominator, because that is standard in the industry for object storage.

func AdaptBucket

func AdaptBucket(bucket Bucket, adapters ...Adapter) Bucket

func EmptyBucket

func EmptyBucket() Bucket

EmptyBucket returns a read-only bucket that contains no objects. All read operations (HeadObject, GetObject, ListObjects) will behave as if the bucket exists but is empty. All write operations will return ErrBucketReadOnly.

func InstrumentedBucket

func InstrumentedBucket(bucket Bucket) Bucket

InstrumentedBucket wraps a bucket with OpenTelemetry tracing spans for all storage operations. Span attributes include bucket location, object keys, content metadata, and error information.

func LoadBucket

func LoadBucket(ctx context.Context, bucketURI string) (Bucket, error)

LoadBucket loads a bucket using the default caching registry. Bucket instances are cached by normalized URI to avoid recreating adapters on each call.

func LoggedBucket

func LoggedBucket(bucket Bucket, logger *slog.Logger) Bucket

LoggedBucket wraps a bucket with structured logging. All operations are logged with relevant attributes such as key, size, duration, and error details.

func Merge

func Merge(buckets ...Bucket) Bucket

Merge creates a bucket that merges multiple buckets according to specific rules: - Location returns the location of the first bucket - Access/Create iterates over all buckets - HeadObject, GetObject iterates over all buckets, returns the first that doesn't return ErrObjectNotFound - PutObject puts the object in the first bucket - DeleteObject/DeleteObjects iterates over all buckets - ListObjects uses kway-go to combine the lists of all buckets - WatchObjects uses kway-go to combine WatchObjects on first bucket with ListObjects on others - Presign* methods delegate to the first bucket

func Mount

func Mount(base Bucket, prefix string, mount Bucket) Bucket

Mount creates a new bucket that mounts another bucket at the specified prefix. Operations on keys matching the prefix are delegated to the mounted bucket, while other operations are handled by the underlying bucket.

func Overlay

func Overlay(writeLayer, readLayer Bucket) Bucket

Overlay creates a bucket that combines a write layer and a read layer. Objects are read from the write layer first, falling back to the read layer if not found. All writes go to the write layer. Listings merge objects from both layers.

func Prefix

func Prefix(bucket Bucket, prefix string) Bucket

func ReadOnlyBucket

func ReadOnlyBucket(bucket Bucket) Bucket

ReadOnlyBucket wraps a bucket to reject all write operations with ErrBucketReadOnly. Read operations are passed through to the underlying bucket.

type Cache

type Cache struct {
	// contains filtered or unexported fields
}

Cache is an in-memory cache for objects read from a Bucket.

func NewCache

func NewCache(options ...CacheOption) *Cache

NewCache constructs a new Cache instance configured with the options passed as arguments.

By default, the page and object caches are 64MiB each, and the object info cache is 512KiB. The page size for byte ranges is set to 256KiB. The default TTL is 1 minute.

func (*Cache) AdaptBucket

func (c *Cache) AdaptBucket(bucket Bucket) Bucket

AdaptBucket returns a Bucket that caches the results of calls to HeadObject, and GetObject.

func (*Cache) PageSize

func (c *Cache) PageSize() int64

PageSize returns the size of each page in the cache.

func (*Cache) Stat

func (c *Cache) Stat() (objects, infos, pages CacheStat)

Stat returns statistics about the cache, including the page size, number of

type CacheOption

type CacheOption func(*Cache)

CacheOption is a function type that can be used to configure new Cache instances created by calling NewCache.

func CacheMeterProvider

func CacheMeterProvider(provider metric.MeterProvider) CacheOption

CacheMeterProvider enables cache metrics using the provided MeterProvider.

func CachePageSize

func CachePageSize(size int64) CacheOption

CachePageSize sets the size of each page in the cache. This is used when fetching byte ranges from objects. If the page size is zero or negative, no caching is done for byte ranges.

func CacheTTL

func CacheTTL(d time.Duration) CacheOption

CacheTTL sets the time-to-live for cached entries. After the TTL expires, entries will be re-fetched on the next access. A TTL of 0 disables expiration. The default TTL is 1 minute.

func ObjectCacheSize

func ObjectCacheSize(size int64) CacheOption

ObjectCacheSize sets the maximum size of the cache ofr full objects.

func ObjectInfoCacheSize

func ObjectInfoCacheSize(size int64) CacheOption

ObjectInfoCacheSize sets the maximum size of the cache for ObjectInfo values (e.g., from HeadObject calls).

func ObjectPageCacheSize

func ObjectPageCacheSize(size int64) CacheOption

ObjectPageCacheSize sets the maximum size of the cache for object pages stored from calls to GetObject with a byte range.

type CacheStat

type CacheStat struct {
	Limit     int64 // Maximum size of the cache in bytes.
	Entries   int64 // Current number of cached entries.
	Size      int64 // Current size of the cache in bytes.
	Hits      int64 // Total number of cache hits.
	Misses    int64 // Total number of cache misses.
	Evictions int64 // Total number of evictions from the cache.
}

CacheStat contains statistics about the cache configuration and utilization.

type File

type File struct {
	// contains filtered or unexported fields
}

func NewFile

func NewFile(ctx context.Context, bucket Bucket, key string, size int64) *File

func OpenFile

func OpenFile(ctx context.Context, store Registry, location string, size int64) (*File, error)

func (*File) Bucket

func (f *File) Bucket() Bucket

func (*File) Context

func (f *File) Context() context.Context

func (*File) Key

func (f *File) Key() string

func (*File) Name

func (f *File) Name() string

func (*File) ReadAt

func (f *File) ReadAt(b []byte, off int64) (int, error)

func (*File) Size

func (f *File) Size() int64

func (*File) WithContext

func (f *File) WithContext(ctx context.Context) *File

type GetOption

type GetOption func(*GetOptions)

func BytesRange

func BytesRange(start, end int64) GetOption

BytesRange requests a specific byte range of an object, inclusive on both ends. Passing -1 for end means "to the end of the object" and maps onto native open-ended range support in HTTP, S3, and GCS. A read at start == object size (or past it) with end == -1 is not an error — the returned reader yields zero bytes and EOF. Other negative values (start < 0, end < -1, or end < start with end >= 0) are rejected with ErrInvalidRange.

type GetOptions

type GetOptions struct {
	// contains filtered or unexported fields
}

func NewGetOptions

func NewGetOptions(options ...GetOption) *GetOptions

func (*GetOptions) BytesRange

func (get *GetOptions) BytesRange() (start, end int64, ok bool)

type ListOption

type ListOption func(*ListOptions)

func KeyDelimiter

func KeyDelimiter(delimiter string) ListOption

func KeyPrefix

func KeyPrefix(prefix string) ListOption

func MaxKeys

func MaxKeys(n int) ListOption

func StartAfter

func StartAfter(key string) ListOption

type ListOptions

type ListOptions struct {
	// contains filtered or unexported fields
}

func NewListOptions

func NewListOptions(options ...ListOption) *ListOptions

func (*ListOptions) KeyDelimiter

func (list *ListOptions) KeyDelimiter() string

func (*ListOptions) KeyPrefix

func (list *ListOptions) KeyPrefix() string

func (*ListOptions) MaxKeys

func (list *ListOptions) MaxKeys() int

func (*ListOptions) StartAfter

func (list *ListOptions) StartAfter() string

type Object

type Object struct {
	Key          string    `json:"key"`
	Size         int64     `json:"size"`
	LastModified time.Time `json:"last-modified,omitzero"`
}

Object is the type of values returned by the ListObjects method.

This type contains the minimal set of information available about each object key when iterating through a prefix of the object store.

type ObjectInfo

type ObjectInfo struct {
	CacheControl    string            `json:"cache-control,omitempty"`
	ContentType     string            `json:"content-type,omitempty"`
	ContentEncoding string            `json:"content-encoding,omitempty"`
	ETag            string            `json:"etag,omitempty"`
	Size            int64             `json:"size"`
	LastModified    time.Time         `json:"last-modified,omitzero"`
	Metadata        map[string]string `json:"metadata,omitempty"`
}

ObjectInfo represent detailed metadata about an object.

This type differs from Object by not including the key, which is always known to the application when obtaining an ObjectInfo, and by including more metadata that are not available when iterating through a prefix of the object store.

func GetObject

func GetObject(ctx context.Context, objectURI string, options ...GetOption) (io.ReadCloser, ObjectInfo, error)

func GetObjectAt

func GetObjectAt(ctx context.Context, registry Registry, objectURI string, options ...GetOption) (io.ReadCloser, ObjectInfo, error)

func HeadObject

func HeadObject(ctx context.Context, objectURI string) (ObjectInfo, error)

func HeadObjectAt

func HeadObjectAt(ctx context.Context, registry Registry, objectURI string) (ObjectInfo, error)

func PutObject

func PutObject(ctx context.Context, objectURI string, object io.Reader, options ...PutOption) (ObjectInfo, error)

func PutObjectAt

func PutObjectAt(ctx context.Context, registry Registry, objectURI string, object io.Reader, options ...PutOption) (ObjectInfo, error)

type PutOption

type PutOption func(*PutOptions)

func CacheControl

func CacheControl(cc string) PutOption

func ChecksumSHA256

func ChecksumSHA256(sum [sha256.Size]byte) PutOption

ChecksumSHA256 declares the SHA-256 the body must hash to. Backends either let their object store verify natively (S3, S3-compatible HTTP) or stream-verify on the client (memory, GCS, file). On mismatch they return an error wrapping ErrChecksumMismatch and do not durably store the body.

func ContentEncoding

func ContentEncoding(ce string) PutOption

func ContentLength

func ContentLength(length int64) PutOption

func ContentType

func ContentType(ct string) PutOption

func IfMatch

func IfMatch(etag string) PutOption

func IfNoneMatch

func IfNoneMatch(etag string) PutOption

func Metadata

func Metadata(key, value string) PutOption

type PutOptions

type PutOptions struct {
	// contains filtered or unexported fields
}

func NewPutOptions

func NewPutOptions(options ...PutOption) *PutOptions

func (*PutOptions) CacheControl

func (put *PutOptions) CacheControl() string

func (*PutOptions) ChecksumSHA256

func (put *PutOptions) ChecksumSHA256() ([sha256.Size]byte, bool)

ChecksumSHA256 returns the configured SHA-256 the body must hash to, and a bool indicating whether one was set. The all-zero value is the "unset" sentinel — collision with a real SHA-256 is 1 in 2^256.

func (*PutOptions) ContentEncoding

func (put *PutOptions) ContentEncoding() string

func (*PutOptions) ContentLength

func (put *PutOptions) ContentLength(r io.Reader) (int64, error)

ContentLength returns the configured value, else tries to figure out the content length of the io.Reader. If it cannot figure out the length, then returns -1.

The following interfaces are probed in order of precedence:

  1. ContentLength() int64 — explicit 64-bit length (takes precedence over Len)
  2. Len() int — implemented by bytes.Buffer, bytes.Reader, strings.Reader
  3. *os.File — uses Stat to obtain the file size
  4. io.Seeker — seeks to end and back to measure the remaining bytes

func (*PutOptions) ContentType

func (put *PutOptions) ContentType() string

func (*PutOptions) IfMatch

func (put *PutOptions) IfMatch() string

func (*PutOptions) IfNoneMatch

func (put *PutOptions) IfNoneMatch() string

func (*PutOptions) Metadata

func (put *PutOptions) Metadata() map[string]string

type Registry

type Registry interface {
	LoadBucket(ctx context.Context, bucketURI string) (Bucket, error)
}

func DefaultRegistry

func DefaultRegistry() Registry

DefaultRegistry returns the default caching registry that loads buckets from the global registry and applies global adapters.

func RegistryFunc

func RegistryFunc(load func(context.Context, string) (Bucket, error)) Registry

RegistryFunc creates a Registry that caches bucket instances by URI. The load function is called to create buckets on cache miss. Bucket URIs are normalized before lookup to ensure consistent caching.

func SingleBucketRegistry

func SingleBucketRegistry(bucket Bucket) Registry

func WithAdapters

func WithAdapters(registry Registry, adapters ...Adapter) Registry

WithAdapters returns a Registry that applies the given adapters to all buckets loaded from the wrapped registry. This allows custom registries to have adapters applied, similar to how Install() works for the global registry.

Directories

Path Synopsis
Package fuse provides a FUSE filesystem backed by a storage.Bucket.
Package fuse provides a FUSE filesystem backed by a storage.Bucket.
gs
Package r2 provides a Cloudflare R2 storage backend.
Package r2 provides a Cloudflare R2 storage backend.
s3

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL