Documentation
¶
Index ¶
- Constants
- Variables
- func CopyObject(ctx context.Context, sourceURI, destURI string, options ...PutOption) error
- func CopyObjectAt(ctx context.Context, registry Registry, sourceURI, destURI string, ...) error
- func DeleteObject(ctx context.Context, objectURI string) error
- func DeleteObjectAt(ctx context.Context, registry Registry, objectURI string) error
- func DeleteObjects(ctx context.Context, objects iter.Seq2[string, error]) iter.Seq2[string, error]
- func DeleteObjectsAt(ctx context.Context, registry Registry, objects iter.Seq2[string, error]) iter.Seq2[string, error]
- func FS(ctx context.Context, reg Registry) fs.FS
- func Install(adapters ...Adapter)
- func ListObjects(ctx context.Context, prefixURI string, options ...ListOption) iter.Seq2[Object, error]
- func ListObjectsAt(ctx context.Context, registry Registry, prefixURI string, ...) iter.Seq2[Object, error]
- func Location(location, path string) string
- func NewOptions[Options any, Option ~func(*Options)](options iter.Seq[Option]) *Options
- func PutObjectAtWriter(ctx context.Context, registry Registry, objectURI string, options ...PutOption) io.WriteCloser
- func PutObjectWriter(ctx context.Context, objectURI string, options ...PutOption) io.WriteCloser
- func Register(typ string, reg Registry)
- func ValidObjectKey(key string) error
- func ValidObjectRange(key string, start, end int64) error
- func WatchObjects(ctx context.Context, bucket Bucket, options ...ListOption) iter.Seq2[Object, error]
- type Adapter
- type AdapterFunc
- type Bucket
- func AdaptBucket(bucket Bucket, adapters ...Adapter) Bucket
- func EmptyBucket() Bucket
- func InstrumentedBucket(bucket Bucket) Bucket
- func LoadBucket(ctx context.Context, bucketURI string) (Bucket, error)
- func LoggedBucket(bucket Bucket, logger *slog.Logger) Bucket
- func Merge(buckets ...Bucket) Bucket
- func Mount(base Bucket, prefix string, mount Bucket) Bucket
- func Overlay(writeLayer, readLayer Bucket) Bucket
- func Prefix(bucket Bucket, prefix string) Bucket
- func ReadOnlyBucket(bucket Bucket) Bucket
- type Cache
- type CacheOption
- type CacheStat
- type File
- type GetOption
- type GetOptions
- type ListOption
- type ListOptions
- type Object
- type ObjectInfo
- func GetObject(ctx context.Context, objectURI string, options ...GetOption) (io.ReadCloser, ObjectInfo, error)
- func GetObjectAt(ctx context.Context, registry Registry, objectURI string, options ...GetOption) (io.ReadCloser, ObjectInfo, error)
- func HeadObject(ctx context.Context, objectURI string) (ObjectInfo, error)
- func HeadObjectAt(ctx context.Context, registry Registry, objectURI string) (ObjectInfo, error)
- func PutObject(ctx context.Context, objectURI string, object io.Reader, options ...PutOption) (ObjectInfo, error)
- func PutObjectAt(ctx context.Context, registry Registry, objectURI string, object io.Reader, ...) (ObjectInfo, error)
- type PutOption
- func CacheControl(cc string) PutOption
- func ChecksumSHA256(sum [sha256.Size]byte) PutOption
- func ContentEncoding(ce string) PutOption
- func ContentLength(length int64) PutOption
- func ContentType(ct string) PutOption
- func IfMatch(etag string) PutOption
- func IfNoneMatch(etag string) PutOption
- func Metadata(key, value string) PutOption
- type PutOptions
- func (put *PutOptions) CacheControl() string
- func (put *PutOptions) ChecksumSHA256() ([sha256.Size]byte, bool)
- func (put *PutOptions) ContentEncoding() string
- func (put *PutOptions) ContentLength(r io.Reader) (int64, error)
- func (put *PutOptions) ContentType() string
- func (put *PutOptions) IfMatch() string
- func (put *PutOptions) IfNoneMatch() string
- func (put *PutOptions) Metadata() map[string]string
- type Registry
Constants ¶
const ( DefaultCachePageSize = 256 * 1024 // 256 KiB DefaultObjectInfoCacheSize = 512 * 1024 // 512 KiB DefaultObjectPageCacheSize = 64 * 1024 * 1024 // 64 MiB DefaultObjectCacheSize = 64 * 1024 * 1024 // 64 MiB DefaultCacheTTL = time.Minute )
const ( ContentTypeJSON = "application/json" ContentTypeAvro = "application/avro" ContentTypeParquet = "application/vnd.apache.parquet" CacheControlImmutable = "public, max-age=31536000, immutable" )
Variables ¶
var ( ErrBucketExist = errors.New("bucket exist") ErrBucketNotFound = errors.New("bucket not found") ErrBucketReadOnly = errors.New("read-only bucket") ErrChecksumMismatch = makeExpected(errors.New("checksum mismatch")) ErrObjectNotFound = makeExpected(errors.New("object not found")) ErrObjectNotMatch = makeTemporary(errors.New("object mismatch")) ErrInvalidObjectKey = errors.New("invalid object key") ErrInvalidObjectTag = errors.New("invalid object tag") ErrInvalidRange = errors.New("offset out of range") ErrPresignNotSupported = errors.New("presigned URLs not supported") ErrPresignRedirect = makeExpected(errors.New("redirect to presigned URL")) ErrTooManyRequests = makeTemporary(errors.New("too many requests")) )
Functions ¶
func CopyObject ¶
func CopyObjectAt ¶
func DeleteObjectAt ¶
func DeleteObjects ¶
func DeleteObjectsAt ¶
func FS ¶
FS constructs a fs.FS using the given registry to load objects from buckets based on their URI location.
The context is used for all operations on the registry and buckets that it loads, cancelling it will abort all inflight I/O on the filesystem.
func ListObjects ¶
func ListObjectsAt ¶
func NewOptions ¶
func PutObjectAtWriter ¶
func PutObjectAtWriter(ctx context.Context, registry Registry, objectURI string, options ...PutOption) io.WriteCloser
PutObjectAtWriter wraps a storage.PutObjectAt call in a WriteCloser. Errors returned by PutObjectAt are passed through the WriteCloser methods.
func PutObjectWriter ¶
func ValidObjectKey ¶
ValidObjectKey reports whether key is a legal object key. The rules mirror io/fs.ValidPath (UTF-8, no leading slash, no empty / "." / ".." segments) with one deliberate relaxation: a single trailing "/" is allowed, used by object stores (S3, GCS, ...) as a directory-marker convention and by the fuse package to persist per-directory permissions.
The success path is zero-allocation; allocations only happen when building the wrapped error on rejection. See BenchmarkValidObjectKey.
func ValidObjectRange ¶
ValidObjectRange validates a byte range as passed to BytesRange. The range is inclusive on both ends. An end of -1 means "to the end of the object"; other negative end values are invalid. A negative start is always invalid, as is a non-negative end that precedes the start.
func WatchObjects ¶
func WatchObjects(ctx context.Context, bucket Bucket, options ...ListOption) iter.Seq2[Object, error]
WatchObjects provides a generic implementation of watching for changes to objects in a bucket using ListObjects with exponential backoff. It tracks object state and yields objects that have been added, modified, or deleted.
Objects that have been deleted are yielded with a Size of -1 as a deletion marker. The function uses exponential backoff to poll for changes, resetting the backoff delay whenever changes are detected.
This implementation is similar to storage/http.Bucket.WatchObjects and can be used by bucket implementations that don't have native watch capabilities.
Example usage:
for object, err := range storage.WatchObjects(ctx, bucket, storage.KeyPrefix("logs/")) {
if err != nil {
log.Printf("watch error: %v", err)
continue
}
if object.Size == -1 {
log.Printf("deleted: %s", object.Key)
} else {
log.Printf("added/updated: %s (size: %d)", object.Key, object.Size)
}
}
Types ¶
type Adapter ¶
func WithInstrumentation ¶
func WithInstrumentation() Adapter
WithInstrumentation returns an adapter that wraps buckets with OpenTelemetry tracing. Each storage operation is recorded as a span with relevant attributes.
func WithLogger ¶
WithLogger returns an adapter that wraps buckets with structured logging of all operations using the provided slog.Logger.
func WithMount ¶
WithMount returns an adapter that mounts a bucket at a specific prefix. When the mount prefix matches, operations are delegated to the mounted bucket. Location, Create, and Access operations are unchanged and passed to the underlying bucket.
func WithOverlay ¶
WithOverlay returns an adapter that overlays a read layer under the write layer. Read operations fall back to the read layer when objects are not found in the write layer. Write operations go to the write layer only.
func WithPrefix ¶
func WithReadOnly ¶
func WithReadOnly() Adapter
WithReadOnly returns an adapter that wraps buckets to reject all write operations with ErrBucketReadOnly.
func WithScheme ¶
type AdapterFunc ¶
func (AdapterFunc) AdaptBucket ¶
func (a AdapterFunc) AdaptBucket(b Bucket) Bucket
type Bucket ¶
type Bucket interface {
// Location returns a URI for the bucket. It always includes a
// scheme component, and may include a path component.
//
// Some example location values:
//
// s3://some-bucket
// gcs://another-one/with-prefix
//
// As a special exception, the "memory" bucket implementation
// does _not_ contain a scheme prefix, and instead has the
// special hostname of ":memory:", for historical reasons.
Location() string
// Access verifies that the bucket is accessible. It returns
// nil error only if the bucket can be reached. This can be
// used to test bucket existence and authentication.
Access(ctx context.Context) error
// Create instantiates a new bucket at Location().
Create(ctx context.Context) error
// HeadObject retrieves metadata about the object stored at
// key.
HeadObject(ctx context.Context, key string) (ObjectInfo, error)
// GetObject retrieves the contents of the object stored at
// key, as well as its metadata.
GetObject(ctx context.Context, key string, options ...GetOption) (io.ReadCloser, ObjectInfo, error)
// PutObject stores bytes at key.
PutObject(ctx context.Context, key string, value io.Reader, options ...PutOption) (ObjectInfo, error)
// DeleteObject removes whatever is found at key. It returns
// an error if there is nothing there.
DeleteObject(ctx context.Context, key string) error
// DeleteObjects deletes multiple objects. It consumes the input sequence
// of object keys and yields results for each deletion. The output sequence
// yields (key, nil) for successful deletions and (key, error) for failures.
// Input errors are propagated immediately. The stream must be consumed to
// drive the deletion process.
DeleteObjects(ctx context.Context, objects iter.Seq2[string, error]) iter.Seq2[string, error]
// CopyObject copies an object from one key to another within the same bucket.
// Source object metadata is preserved by default; any PutOptions provided will
// override specific fields (merge semantics).
CopyObject(ctx context.Context, from, to string, options ...PutOption) error
// ListObjects gathers a list of abbreviated metadata for all
// objects in a bucket, or under a key prefix (set through a
// ListOption).
ListObjects(ctx context.Context, options ...ListOption) iter.Seq2[Object, error]
// WatchObjects is list ListObjects but the sequence doesn't end.
// After listing the objects, it watches for any changes to the
// prefix and yields new objects as they are added, modified, or
// removed. The removal of objects is indicated by yielding an
// Object with a negative Size.
WatchObjects(ctx context.Context, options ...ListOption) iter.Seq2[Object, error]
// PresignGetObject generates a presigned URL for getting an object.
// The expiration parameter specifies how long the presigned URL will remain valid.
PresignGetObject(ctx context.Context, key string, expiration time.Duration, options ...GetOption) (string, error)
// PresignPutObject generates a presigned URL for putting an object.
// The expiration parameter specifies how long the presigned URL will remain valid.
PresignPutObject(ctx context.Context, key string, expiration time.Duration, options ...PutOption) (string, error)
// PresignHeadObject generates a presigned URL for getting object metadata.
// The expiration parameter specifies how long the presigned URL will remain valid.
PresignHeadObject(ctx context.Context, key string, expiration time.Duration) (string, error)
// PresignDeleteObject generates a presigned URL for deleting an object.
// The expiration parameter specifies how long the presigned URL will remain valid.
PresignDeleteObject(ctx context.Context, key string, expiration time.Duration) (string, error)
}
Bucket is an interface describing an object storage bucket. It is modeled off of the S3 object storage API. While it has many implementations, it uses S3's interface as the common denominator, because that is standard in the industry for object storage.
func AdaptBucket ¶
func EmptyBucket ¶
func EmptyBucket() Bucket
EmptyBucket returns a read-only bucket that contains no objects. All read operations (HeadObject, GetObject, ListObjects) will behave as if the bucket exists but is empty. All write operations will return ErrBucketReadOnly.
func InstrumentedBucket ¶
InstrumentedBucket wraps a bucket with OpenTelemetry tracing spans for all storage operations. Span attributes include bucket location, object keys, content metadata, and error information.
func LoadBucket ¶
LoadBucket loads a bucket using the default caching registry. Bucket instances are cached by normalized URI to avoid recreating adapters on each call.
func LoggedBucket ¶
LoggedBucket wraps a bucket with structured logging. All operations are logged with relevant attributes such as key, size, duration, and error details.
func Merge ¶
Merge creates a bucket that merges multiple buckets according to specific rules: - Location returns the location of the first bucket - Access/Create iterates over all buckets - HeadObject, GetObject iterates over all buckets, returns the first that doesn't return ErrObjectNotFound - PutObject puts the object in the first bucket - DeleteObject/DeleteObjects iterates over all buckets - ListObjects uses kway-go to combine the lists of all buckets - WatchObjects uses kway-go to combine WatchObjects on first bucket with ListObjects on others - Presign* methods delegate to the first bucket
func Mount ¶
Mount creates a new bucket that mounts another bucket at the specified prefix. Operations on keys matching the prefix are delegated to the mounted bucket, while other operations are handled by the underlying bucket.
func Overlay ¶
Overlay creates a bucket that combines a write layer and a read layer. Objects are read from the write layer first, falling back to the read layer if not found. All writes go to the write layer. Listings merge objects from both layers.
func ReadOnlyBucket ¶
ReadOnlyBucket wraps a bucket to reject all write operations with ErrBucketReadOnly. Read operations are passed through to the underlying bucket.
type Cache ¶
type Cache struct {
// contains filtered or unexported fields
}
Cache is an in-memory cache for objects read from a Bucket.
func NewCache ¶
func NewCache(options ...CacheOption) *Cache
NewCache constructs a new Cache instance configured with the options passed as arguments.
By default, the page and object caches are 64MiB each, and the object info cache is 512KiB. The page size for byte ranges is set to 256KiB. The default TTL is 1 minute.
func (*Cache) AdaptBucket ¶
AdaptBucket returns a Bucket that caches the results of calls to HeadObject, and GetObject.
type CacheOption ¶
type CacheOption func(*Cache)
CacheOption is a function type that can be used to configure new Cache instances created by calling NewCache.
func CacheMeterProvider ¶
func CacheMeterProvider(provider metric.MeterProvider) CacheOption
CacheMeterProvider enables cache metrics using the provided MeterProvider.
func CachePageSize ¶
func CachePageSize(size int64) CacheOption
CachePageSize sets the size of each page in the cache. This is used when fetching byte ranges from objects. If the page size is zero or negative, no caching is done for byte ranges.
func CacheTTL ¶
func CacheTTL(d time.Duration) CacheOption
CacheTTL sets the time-to-live for cached entries. After the TTL expires, entries will be re-fetched on the next access. A TTL of 0 disables expiration. The default TTL is 1 minute.
func ObjectCacheSize ¶
func ObjectCacheSize(size int64) CacheOption
ObjectCacheSize sets the maximum size of the cache ofr full objects.
func ObjectInfoCacheSize ¶
func ObjectInfoCacheSize(size int64) CacheOption
ObjectInfoCacheSize sets the maximum size of the cache for ObjectInfo values (e.g., from HeadObject calls).
func ObjectPageCacheSize ¶
func ObjectPageCacheSize(size int64) CacheOption
ObjectPageCacheSize sets the maximum size of the cache for object pages stored from calls to GetObject with a byte range.
type CacheStat ¶
type CacheStat struct {
Limit int64 // Maximum size of the cache in bytes.
Entries int64 // Current number of cached entries.
Size int64 // Current size of the cache in bytes.
Hits int64 // Total number of cache hits.
Misses int64 // Total number of cache misses.
Evictions int64 // Total number of evictions from the cache.
}
CacheStat contains statistics about the cache configuration and utilization.
type GetOption ¶
type GetOption func(*GetOptions)
func BytesRange ¶
BytesRange requests a specific byte range of an object, inclusive on both ends. Passing -1 for end means "to the end of the object" and maps onto native open-ended range support in HTTP, S3, and GCS. A read at start == object size (or past it) with end == -1 is not an error — the returned reader yields zero bytes and EOF. Other negative values (start < 0, end < -1, or end < start with end >= 0) are rejected with ErrInvalidRange.
type GetOptions ¶
type GetOptions struct {
// contains filtered or unexported fields
}
func NewGetOptions ¶
func NewGetOptions(options ...GetOption) *GetOptions
func (*GetOptions) BytesRange ¶
func (get *GetOptions) BytesRange() (start, end int64, ok bool)
type ListOption ¶
type ListOption func(*ListOptions)
func KeyDelimiter ¶
func KeyDelimiter(delimiter string) ListOption
func KeyPrefix ¶
func KeyPrefix(prefix string) ListOption
func MaxKeys ¶
func MaxKeys(n int) ListOption
func StartAfter ¶
func StartAfter(key string) ListOption
type ListOptions ¶
type ListOptions struct {
// contains filtered or unexported fields
}
func NewListOptions ¶
func NewListOptions(options ...ListOption) *ListOptions
func (*ListOptions) KeyDelimiter ¶
func (list *ListOptions) KeyDelimiter() string
func (*ListOptions) KeyPrefix ¶
func (list *ListOptions) KeyPrefix() string
func (*ListOptions) MaxKeys ¶
func (list *ListOptions) MaxKeys() int
func (*ListOptions) StartAfter ¶
func (list *ListOptions) StartAfter() string
type Object ¶
type Object struct {
Key string `json:"key"`
Size int64 `json:"size"`
LastModified time.Time `json:"last-modified,omitzero"`
}
Object is the type of values returned by the ListObjects method.
This type contains the minimal set of information available about each object key when iterating through a prefix of the object store.
type ObjectInfo ¶
type ObjectInfo struct {
CacheControl string `json:"cache-control,omitempty"`
ContentType string `json:"content-type,omitempty"`
ContentEncoding string `json:"content-encoding,omitempty"`
ETag string `json:"etag,omitempty"`
Size int64 `json:"size"`
LastModified time.Time `json:"last-modified,omitzero"`
Metadata map[string]string `json:"metadata,omitempty"`
}
ObjectInfo represent detailed metadata about an object.
This type differs from Object by not including the key, which is always known to the application when obtaining an ObjectInfo, and by including more metadata that are not available when iterating through a prefix of the object store.
func GetObject ¶
func GetObject(ctx context.Context, objectURI string, options ...GetOption) (io.ReadCloser, ObjectInfo, error)
func GetObjectAt ¶
func GetObjectAt(ctx context.Context, registry Registry, objectURI string, options ...GetOption) (io.ReadCloser, ObjectInfo, error)
func HeadObject ¶
func HeadObject(ctx context.Context, objectURI string) (ObjectInfo, error)
func HeadObjectAt ¶
type PutOption ¶
type PutOption func(*PutOptions)
func CacheControl ¶
func ChecksumSHA256 ¶
ChecksumSHA256 declares the SHA-256 the body must hash to. Backends either let their object store verify natively (S3, S3-compatible HTTP) or stream-verify on the client (memory, GCS, file). On mismatch they return an error wrapping ErrChecksumMismatch and do not durably store the body.
func ContentEncoding ¶
func ContentLength ¶
func ContentType ¶
func IfNoneMatch ¶
type PutOptions ¶
type PutOptions struct {
// contains filtered or unexported fields
}
func NewPutOptions ¶
func NewPutOptions(options ...PutOption) *PutOptions
func (*PutOptions) CacheControl ¶
func (put *PutOptions) CacheControl() string
func (*PutOptions) ChecksumSHA256 ¶
func (put *PutOptions) ChecksumSHA256() ([sha256.Size]byte, bool)
ChecksumSHA256 returns the configured SHA-256 the body must hash to, and a bool indicating whether one was set. The all-zero value is the "unset" sentinel — collision with a real SHA-256 is 1 in 2^256.
func (*PutOptions) ContentEncoding ¶
func (put *PutOptions) ContentEncoding() string
func (*PutOptions) ContentLength ¶
func (put *PutOptions) ContentLength(r io.Reader) (int64, error)
ContentLength returns the configured value, else tries to figure out the content length of the io.Reader. If it cannot figure out the length, then returns -1.
The following interfaces are probed in order of precedence:
- ContentLength() int64 — explicit 64-bit length (takes precedence over Len)
- Len() int — implemented by bytes.Buffer, bytes.Reader, strings.Reader
- *os.File — uses Stat to obtain the file size
- io.Seeker — seeks to end and back to measure the remaining bytes
func (*PutOptions) ContentType ¶
func (put *PutOptions) ContentType() string
func (*PutOptions) IfMatch ¶
func (put *PutOptions) IfMatch() string
func (*PutOptions) IfNoneMatch ¶
func (put *PutOptions) IfNoneMatch() string
func (*PutOptions) Metadata ¶
func (put *PutOptions) Metadata() map[string]string
type Registry ¶
func DefaultRegistry ¶
func DefaultRegistry() Registry
DefaultRegistry returns the default caching registry that loads buckets from the global registry and applies global adapters.
func RegistryFunc ¶
RegistryFunc creates a Registry that caches bucket instances by URI. The load function is called to create buckets on cache miss. Bucket URIs are normalized before lookup to ensure consistent caching.
func SingleBucketRegistry ¶
func WithAdapters ¶
WithAdapters returns a Registry that applies the given adapters to all buckets loaded from the wrapped registry. This allows custom registries to have adapters applied, similar to how Install() works for the global registry.
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
Package fuse provides a FUSE filesystem backed by a storage.Bucket.
|
Package fuse provides a FUSE filesystem backed by a storage.Bucket. |
|
Package r2 provides a Cloudflare R2 storage backend.
|
Package r2 provides a Cloudflare R2 storage backend. |