drivers

package
v0.28.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 5, 2025 License: Apache-2.0 Imports: 39 Imported by: 0

Documentation

Overview

internal/drivers/chunked_transfer.go

internal/drivers/circuit_breaker.go

internal/drivers/compression.go

internal/drivers/fallback.go

internal/drivers/health.go

internal/drivers/lyve.go

internal/drivers/parallel_stream.go

internal/drivers/queue.go - Proper queue implementation

internal/drivers/resumable.go

internal/drivers/retry.go

internal/drivers/throttle.go

Index

Constants

View Source
const (
	DefaultChunkSize = 4 * 1024 * 1024 // 4MB
	MaxConcurrency   = 8
)
View Source
const (
	SEEK_DATA = 3 // Seek to next data
	SEEK_HOLE = 4 // Seek to next hole
)

Variables

View Source
var ErrCircuitOpen = errors.New("circuit breaker is open")

ErrCircuitOpen is returned when the circuit breaker is open

View Source
var ErrQueueClosed = errors.New("request queue is closed")

ErrQueueClosed is returned when submitting to a closed queue

View Source
var ErrQueueFull = errors.New("request queue is full")

ErrQueueFull is returned when the queue is at capacity

Functions

This section is empty.

Types

type BufferedWriter

type BufferedWriter struct {
	// contains filtered or unexported fields
}

BufferedWriter wraps a file with write buffering

func (*BufferedWriter) Close

func (bw *BufferedWriter) Close() error

Close flushes and closes the file

func (*BufferedWriter) Flush

func (bw *BufferedWriter) Flush() error

Flush forces buffer to disk

func (*BufferedWriter) Write

func (bw *BufferedWriter) Write(p []byte) (n int, err error)

Write implements io.Writer

type Capability

type Capability string

Capability represents what a driver can do

const (
	CapabilityStreaming   Capability = "streaming"
	CapabilityRangeRead   Capability = "range_read"
	CapabilityMultipart   Capability = "multipart"
	CapabilityVersioning  Capability = "versioning"
	CapabilityEncryption  Capability = "encryption"
	CapabilityReplication Capability = "replication"
	CapabilityWatch       Capability = "watch"
	CapabilityAtomic      Capability = "atomic"
)

type CapabilityChecker

type CapabilityChecker interface {
	Capabilities() []Capability
	HasCapability(cap Capability) bool
}

CapabilityChecker interface for drivers that report capabilities

type ChecksumAlgorithm

type ChecksumAlgorithm string

ChecksumAlgorithm represents a hashing algorithm

const (
	ChecksumSHA256 ChecksumAlgorithm = "sha256"
	ChecksumSHA512 ChecksumAlgorithm = "sha512"
)

type ChunkReader

type ChunkReader struct {
	// contains filtered or unexported fields
}

func NewChunkReader

func NewChunkReader(source io.ReaderAt, size int64) *ChunkReader

func (*ChunkReader) ReadParallel

func (c *ChunkReader) ReadParallel(ctx context.Context) ([]byte, error)

type ChunkedTransfer

type ChunkedTransfer struct {
	// contains filtered or unexported fields
}

func NewChunkedTransfer

func NewChunkedTransfer(chunkSize int, logger *zap.Logger) *ChunkedTransfer

func (*ChunkedTransfer) ChunkedWrite

func (c *ChunkedTransfer) ChunkedWrite(w io.Writer, r io.Reader) (int64, error)

ChunkedWrite writes data in chunks

type CircuitBreaker

type CircuitBreaker struct {
	// contains filtered or unexported fields
}

CircuitBreaker protects against cascading failures

func NewCircuitBreaker

func NewCircuitBreaker(opts ...CircuitOption) *CircuitBreaker

NewCircuitBreaker creates a new circuit breaker

func (*CircuitBreaker) Execute

func (cb *CircuitBreaker) Execute(ctx context.Context, fn func() error) error

Execute runs a function with circuit breaker protection

func (*CircuitBreaker) Reset

func (cb *CircuitBreaker) Reset()

Reset manually resets the circuit breaker

func (*CircuitBreaker) State

func (cb *CircuitBreaker) State() State

State returns the current circuit breaker state

type CircuitOption

type CircuitOption func(*CircuitBreaker)

CircuitOption configures the circuit breaker

func WithCircuitLogger

func WithCircuitLogger(logger *zap.Logger) CircuitOption

WithCircuitLogger adds logging

func WithFailureThreshold

func WithFailureThreshold(n int) CircuitOption

WithFailureThreshold sets failures before opening

func WithResetTimeout

func WithResetTimeout(d time.Duration) CircuitOption

WithResetTimeout sets time before trying again

func WithSuccessThreshold

func WithSuccessThreshold(n int) CircuitOption

WithSuccessThreshold sets successes before closing

func WithTimeout

func WithTimeout(d time.Duration) CircuitOption

WithTimeout sets operation timeout

type CompletedPart

type CompletedPart struct {
	PartNumber int
	ETag       string
	Size       int64
}

CompletedPart represents a completed upload part

type CompressionDriver

type CompressionDriver struct {
	// contains filtered or unexported fields
}

func NewCompressionDriver

func NewCompressionDriver(backend engine.Driver, algorithm string, logger *zap.Logger) *CompressionDriver

func (*CompressionDriver) Delete

func (c *CompressionDriver) Delete(ctx context.Context, container, artifact string) error

Delegate other methods

func (*CompressionDriver) Get

func (c *CompressionDriver) Get(ctx context.Context, container, artifact string) (io.ReadCloser, error)

func (*CompressionDriver) HealthCheck

func (c *CompressionDriver) HealthCheck(ctx context.Context) error

func (*CompressionDriver) List

func (c *CompressionDriver) List(ctx context.Context, container, prefix string) ([]string, error)

func (*CompressionDriver) Name

func (c *CompressionDriver) Name() string

func (*CompressionDriver) Put

func (c *CompressionDriver) Put(ctx context.Context, container, artifact string,
	data io.Reader, opts ...engine.PutOption) error

type ConflictDetector

type ConflictDetector struct{}

func NewConflictDetector

func NewConflictDetector() *ConflictDetector

func (*ConflictDetector) CheckConflict

func (c *ConflictDetector) CheckConflict(v1, v2 Version) bool

type ConflictResolver

type ConflictResolver struct{}

func NewConflictResolver

func NewConflictResolver() *ConflictResolver

func (*ConflictResolver) Resolve

func (c *ConflictResolver) Resolve(base, versionA, versionB []byte) ([]byte, error)

type DirectoryDiff

type DirectoryDiff struct {
	Added    []string
	Modified []string
	Deleted  []string
}

DirectoryDiff represents differences between two directories

type DirectoryIndex

type DirectoryIndex struct {
	FileCount int
	DirCount  int
	TotalSize int64
	Files     []string
	Dirs      []string
}

DirectoryIndex represents indexed directory information

type Driver

type Driver interface {
	Get(ctx context.Context, container, artifact string) (io.ReadCloser, error)
	Put(ctx context.Context, container, artifact string, data io.Reader, opts ...engine.PutOption) error
	Delete(ctx context.Context, container, artifact string) error
	List(ctx context.Context, container string, prefix string) ([]string, error)
	Exists(ctx context.Context, container, artifact string) (bool, error)
}

Driver is the common interface all storage drivers must implement

type DriverStats

type DriverStats struct {
	Reads  int64
	Writes int64
	Errors int64
}

DriverStats tracks driver statistics

type FallbackDriver

type FallbackDriver struct {
	// contains filtered or unexported fields
}

FallbackDriver tries primary driver first, falls back to secondary on failure

func NewFallbackDriver

func NewFallbackDriver(primary, secondary Driver, logger *zap.Logger) *FallbackDriver

NewFallbackDriver creates a driver with fallback capability

func (*FallbackDriver) Delete

func (f *FallbackDriver) Delete(ctx context.Context, container, artifact string) error

Delete from both backends

func (*FallbackDriver) Exists

func (f *FallbackDriver) Exists(ctx context.Context, container, artifact string) (bool, error)

Exists checks primary first, then secondary

func (*FallbackDriver) Get

func (f *FallbackDriver) Get(ctx context.Context, container, artifact string) (io.ReadCloser, error)

Get tries primary first, then secondary

func (*FallbackDriver) List

func (f *FallbackDriver) List(ctx context.Context, container, prefix string) ([]string, error)

List from primary, fallback to secondary

func (*FallbackDriver) Put

func (f *FallbackDriver) Put(ctx context.Context, container, artifact string,
	data io.Reader, opts ...engine.PutOption) error

Put writes to primary, optionally replicates to secondary

type FileInfo

type FileInfo struct {
	Name          string
	Size          int64
	IsDir         bool
	IsSymlink     bool
	SymlinkTarget string
	Mode          os.FileMode
	ModTime       time.Time
}

FileInfo extends basic file information

type FileInfoExtended

type FileInfoExtended struct {
	Size            int64
	BlocksAllocated int64
}

FileInfoExtended includes block allocation info

type FileLock

type FileLock struct {
	// contains filtered or unexported fields
}

FileLock represents a file lock

func (*FileLock) Unlock

func (fl *FileLock) Unlock() error

type GetOperation

type GetOperation struct {
	Container string
	Artifact  string
}

GetOperation describes a parallel get

type GetOptions

type GetOptions struct {
	FollowSymlinks bool
}

GetOptions configures Get behavior

type GetResult

type GetResult struct {
	Reader io.ReadCloser
	Error  error
}

GetResult contains the result of a parallel get

type HealthCheck

type HealthCheck func(ctx context.Context) error

HealthCheck is a function that checks a component's health

type HealthChecker

type HealthChecker struct {
	// contains filtered or unexported fields
}

HealthChecker manages health checks

func NewHealthChecker

func NewHealthChecker(logger *zap.Logger, opts ...HealthOption) *HealthChecker

NewHealthChecker creates a new health checker

func (*HealthChecker) Check

func (h *HealthChecker) Check(ctx context.Context) *HealthReport

Check runs all health checks

func (*HealthChecker) LivenessProbe

func (h *HealthChecker) LivenessProbe() error

LivenessProbe checks if the service is alive

func (*HealthChecker) ReadinessProbe

func (h *HealthChecker) ReadinessProbe() error

ReadinessProbe checks if the service is ready to serve traffic

func (*HealthChecker) RegisterCheck

func (h *HealthChecker) RegisterCheck(name string, check HealthCheck)

RegisterCheck adds a health check

type HealthOption

type HealthOption func(*HealthChecker)

HealthOption configures the health checker

func WithCheckTimeout

func WithCheckTimeout(d time.Duration) HealthOption

WithCheckTimeout sets the timeout for each check

type HealthReport

type HealthReport struct {
	Status    HealthStatus      `json:"status"`
	Checks    map[string]string `json:"checks"`
	Timestamp time.Time         `json:"timestamp"`
}

HealthReport contains the overall health status

type HealthStatus

type HealthStatus string

HealthStatus represents the overall system health

const (
	HealthStatusHealthy   HealthStatus = "healthy"
	HealthStatusUnhealthy HealthStatus = "unhealthy"
	HealthStatusDegraded  HealthStatus = "degraded"
)

type HoleInfo

type HoleInfo struct {
	Offset int64
	Length int64
}

HoleInfo represents a hole in a sparse file

type IAMPolicy

type IAMPolicy struct {
	Version   string      `json:"Version"`
	Statement []Statement `json:"Statement"`
}

func GenerateTenantPolicy

func GenerateTenantPolicy(bucket, tenantPrefix string) IAMPolicy

GenerateTenantPolicy creates a policy restricting access to a tenant's prefix

type LocalDriver

type LocalDriver struct {
	// contains filtered or unexported fields
}

LocalDriver implements the Driver interface for local filesystem

func NewLocalDriver

func NewLocalDriver(basePath string, logger *zap.Logger) *LocalDriver

NewLocalDriver creates a new local filesystem driver

func (*LocalDriver) AtomicDelete

func (d *LocalDriver) AtomicDelete(ctx context.Context, container, artifact string) error

AtomicDelete performs an atomic delete by moving to trash first

func (*LocalDriver) AtomicRename

func (d *LocalDriver) AtomicRename(ctx context.Context, container, oldName, newName string) error

AtomicRename performs an atomic rename operation

func (*LocalDriver) AtomicWrite

func (d *LocalDriver) AtomicWrite(ctx context.Context, container, artifact string, data io.Reader) error

AtomicWrite performs an atomic write operation using temp file + rename

func (*LocalDriver) BeginTransaction

func (d *LocalDriver) BeginTransaction(ctx context.Context) (*Transaction, error)

BeginTransaction starts a new transaction

func (*LocalDriver) Capabilities

func (d *LocalDriver) Capabilities() []Capability

Capabilities returns the capabilities of the LocalDriver

func (*LocalDriver) CompareDirectories

func (d *LocalDriver) CompareDirectories(ctx context.Context, sourceContainer, sourceDir, destContainer, destDir string) (*DirectoryDiff, error)

CompareDirectories compares two directories and returns differences

func (*LocalDriver) CompleteMultipartUpload

func (d *LocalDriver) CompleteMultipartUpload(ctx context.Context, upload *MultipartUpload, parts []CompletedPart) error

CompleteMultipartUpload assembles all parts into the final file

func (*LocalDriver) Copy

func (d *LocalDriver) Copy(ctx context.Context, srcContainer, srcArtifact, dstContainer, dstArtifact string) error

Copy copies a file preserving permissions

func (*LocalDriver) CreateDirectory

func (d *LocalDriver) CreateDirectory(ctx context.Context, container, dir string) error

CreateDirectory creates a directory within a container

func (*LocalDriver) CreateMultipartUpload

func (d *LocalDriver) CreateMultipartUpload(ctx context.Context, container, artifact string) (*MultipartUpload, error)

CreateMultipartUpload initiates a multipart upload CreateMultipartUpload initiates a multipart upload

func (*LocalDriver) CreateSparse

func (d *LocalDriver) CreateSparse(ctx context.Context, container, artifact string, size int64) error

CreateSparse creates a true sparse file using fallocate

func (*LocalDriver) Delete

func (d *LocalDriver) Delete(ctx context.Context, container, artifact string) error

Delete removes an artifact from a container

func (*LocalDriver) DirectoryExists

func (d *LocalDriver) DirectoryExists(ctx context.Context, container, dir string) (bool, error)

DirectoryExists checks if a directory exists

func (*LocalDriver) Exists

func (d *LocalDriver) Exists(ctx context.Context, container, artifact string) (bool, error)

Exists checks if an artifact exists

func (*LocalDriver) FindFilesByExtension

func (d *LocalDriver) FindFilesByExtension(ctx context.Context, container, ext string) ([]string, error)

FindFilesByExtension finds all files with a specific extension

func (*LocalDriver) FindFilesByPattern

func (d *LocalDriver) FindFilesByPattern(ctx context.Context, container, pattern string) ([]string, error)

FindFilesByPattern finds files matching a glob pattern

func (*LocalDriver) Get

func (d *LocalDriver) Get(ctx context.Context, container, artifact string) (io.ReadCloser, error)

Get retrieves an artifact from a container

func (*LocalDriver) GetChecksum

func (d *LocalDriver) GetChecksum(ctx context.Context, container, artifact string, algorithm ChecksumAlgorithm) (string, error)

GetChecksum calculates the checksum of an artifact

func (*LocalDriver) GetDirectoryModTime

func (d *LocalDriver) GetDirectoryModTime(ctx context.Context, container, dir string) (time.Time, error)

GetDirectoryModTime returns the most recent modification time in a directory

func (*LocalDriver) GetDirectorySize

func (d *LocalDriver) GetDirectorySize(ctx context.Context, container, dir string) (int64, error)

GetDirectorySize calculates the total size of a directory

func (*LocalDriver) GetFileInfo

func (d *LocalDriver) GetFileInfo(ctx context.Context, container, artifact string) (*FileInfoExtended, error)

GetFileInfo returns detailed file information including actual block allocation

func (*LocalDriver) GetHoles

func (d *LocalDriver) GetHoles(ctx context.Context, container, artifact string) ([]HoleInfo, error)

GetHoles returns actual holes in a sparse file using SEEK_HOLE/SEEK_DATA

func (*LocalDriver) GetInfo

func (d *LocalDriver) GetInfo(ctx context.Context, container, artifact string) (*FileInfo, error)

GetInfo returns detailed information about an artifact

func (*LocalDriver) GetOwnership

func (d *LocalDriver) GetOwnership(ctx context.Context, container, artifact string) (uid, gid int, err error)

GetOwnership retrieves the owner and group for an artifact

func (*LocalDriver) GetPermissions

func (d *LocalDriver) GetPermissions(ctx context.Context, container, artifact string) (os.FileMode, error)

GetPermissions retrieves the file permissions for an artifact

func (*LocalDriver) GetPoolStats

func (d *LocalDriver) GetPoolStats() map[string]interface{}

GetPoolStats returns pool metrics

func (*LocalDriver) GetPooled

func (d *LocalDriver) GetPooled(ctx context.Context, container, artifact string) (io.ReadCloser, error)

GetPooled retrieves a file using pooled resources

func (*LocalDriver) GetWithOptions

func (d *LocalDriver) GetWithOptions(ctx context.Context, container, artifact string, opts GetOptions) (io.ReadCloser, error)

GetWithOptions retrieves an artifact with configurable options

func (*LocalDriver) GetWriteBufferStats

func (d *LocalDriver) GetWriteBufferStats() map[string]interface{}

GetWriteBufferStats returns buffer statistics

func (*LocalDriver) GetXAttr

func (d *LocalDriver) GetXAttr(ctx context.Context, container, artifact, name string) ([]byte, error)

GetXAttr retrieves an extended attribute from an artifact

func (*LocalDriver) HasCapability

func (d *LocalDriver) HasCapability(cap Capability) bool

HasCapability checks if the driver has a specific capability

func (*LocalDriver) HasDirectoryChanged

func (d *LocalDriver) HasDirectoryChanged(ctx context.Context, container, dir string, since time.Time) (bool, error)

HasDirectoryChanged checks if directory has changed since a given time

func (*LocalDriver) HealthCheck

func (d *LocalDriver) HealthCheck(ctx context.Context) error

HealthCheck verifies the driver is working

func (*LocalDriver) IndexDirectory

func (d *LocalDriver) IndexDirectory(ctx context.Context, container, dir string) (*DirectoryIndex, error)

IndexDirectory creates an index of a directory

func (*LocalDriver) List

func (d *LocalDriver) List(ctx context.Context, container, prefix string) ([]string, error)

List lists artifacts in a container

func (*LocalDriver) ListDirectory

func (d *LocalDriver) ListDirectory(ctx context.Context, container, dir string) ([]os.DirEntry, error)

ListDirectory lists entries in a directory

func (*LocalDriver) ListXAttrs

func (d *LocalDriver) ListXAttrs(ctx context.Context, container, artifact string) ([]string, error)

ListXAttrs lists all extended attributes on an artifact

func (*LocalDriver) LockFile

func (d *LocalDriver) LockFile(ctx context.Context, container, artifact string, lockType LockType) (*FileLock, error)

LockFile acquires a file lock using flock

func (*LocalDriver) Name

func (d *LocalDriver) Name() string

Name returns the driver name

func (*LocalDriver) Put

func (d *LocalDriver) Put(ctx context.Context, container, artifact string, data io.Reader, opts ...engine.PutOption) error

Put stores an artifact in a container

func (*LocalDriver) PutBuffered

func (d *LocalDriver) PutBuffered(ctx context.Context, container, artifact string) (io.WriteCloser, error)

PutBuffered creates a buffered writer for efficient small writes

func (*LocalDriver) RemoveDirectory

func (d *LocalDriver) RemoveDirectory(ctx context.Context, container, dir string) error

RemoveDirectory removes a directory from a container

func (*LocalDriver) ReturnPooledReader

func (d *LocalDriver) ReturnPooledReader(pr io.ReadCloser) error

ReturnPooledReader returns resources to pool

func (*LocalDriver) SetOwnership

func (d *LocalDriver) SetOwnership(ctx context.Context, container, artifact string, uid, gid int) error

SetOwnership sets the owner and group for an artifact

func (*LocalDriver) SetPermissions

func (d *LocalDriver) SetPermissions(ctx context.Context, container, artifact string, mode os.FileMode) error

SetPermissions sets the file permissions for an artifact

func (*LocalDriver) SetXAttr

func (d *LocalDriver) SetXAttr(ctx context.Context, container, artifact, name string, value []byte) error

SetXAttr sets an extended attribute on an artifact

func (d *LocalDriver) SupportsSymlinks() bool

SupportsSymlinks checks if the filesystem supports symlinks

func (*LocalDriver) SyncDirectory

func (d *LocalDriver) SyncDirectory(ctx context.Context, sourceContainer, sourceDir, destContainer, destDir string) error

SyncDirectory synchronizes files from source to destination

func (*LocalDriver) UploadPart

func (d *LocalDriver) UploadPart(ctx context.Context, upload *MultipartUpload, partNumber int, data io.Reader) (CompletedPart, error)

UploadPart uploads a single part of a multipart upload

func (*LocalDriver) VerifyChecksum

func (d *LocalDriver) VerifyChecksum(ctx context.Context, container, artifact string, expected string, algorithm ChecksumAlgorithm) error

VerifyChecksum verifies an artifact matches the expected checksum

func (*LocalDriver) WalkDirectory

func (d *LocalDriver) WalkDirectory(ctx context.Context, container, dir string, fn func(path string, entry os.DirEntry) error) error

WalkDirectory walks a directory tree and calls fn for each entry

func (*LocalDriver) Watch

func (d *LocalDriver) Watch(ctx context.Context, prefix string) (<-chan WatchEvent, <-chan error, error)

Watch implements the Watcher interface

func (*LocalDriver) WriteAt

func (d *LocalDriver) WriteAt(ctx context.Context, container, artifact string, data []byte, offset int64) error

WriteAt writes data at a specific offset

type LockType

type LockType int

LockType for file locking

const (
	LockShared LockType = iota
	LockExclusive
)

type LyveDriver

type LyveDriver struct {
	// contains filtered or unexported fields
}

func NewLyveDriver

func NewLyveDriver(accessKey, secretKey, tenantID, region string, logger *zap.Logger) (*LyveDriver, error)

func (*LyveDriver) Delete

func (d *LyveDriver) Delete(ctx context.Context, container, artifact string) error

func (*LyveDriver) Get

func (d *LyveDriver) Get(ctx context.Context, container, artifact string) (io.ReadCloser, error)

func (*LyveDriver) HealthCheck

func (d *LyveDriver) HealthCheck(ctx context.Context) error

func (*LyveDriver) List

func (d *LyveDriver) List(ctx context.Context, container string, prefix string) ([]string, error)

func (*LyveDriver) Name

func (d *LyveDriver) Name() string

func (*LyveDriver) Put

func (d *LyveDriver) Put(ctx context.Context, container, artifact string, data io.Reader, opts ...engine.PutOption) error

type MultipartUpload

type MultipartUpload struct {
	ID        string
	Container string
	Artifact  string
}

MultipartUpload represents an in-progress multipart upload MultipartUpload represents an in-progress multipart upload

type ParallelDriver

type ParallelDriver struct {
	// contains filtered or unexported fields
}

ParallelDriver wraps any driver with parallel execution

func NewParallelDriver

func NewParallelDriver(backend Driver, workers int, logger *zap.Logger) *ParallelDriver

NewParallelDriver creates a driver that executes operations in parallel

func (*ParallelDriver) ParallelGet

func (d *ParallelDriver) ParallelGet(ctx context.Context, operations []GetOperation) []GetResult

ParallelGet reads multiple files concurrently

func (*ParallelDriver) ParallelPut

func (d *ParallelDriver) ParallelPut(ctx context.Context, operations []PutOperation) []error

ParallelPut writes multiple files concurrently

type PolicyEvaluator

type PolicyEvaluator struct {
	// contains filtered or unexported fields
}

func NewPolicyEvaluator

func NewPolicyEvaluator() *PolicyEvaluator

func (*PolicyEvaluator) AddPolicy

func (p *PolicyEvaluator) AddPolicy(policy IAMPolicy)

func (*PolicyEvaluator) IsAllowed

func (p *PolicyEvaluator) IsAllowed(action, resource string, conditions map[string]string) bool

type PooledReader

type PooledReader struct {
	// contains filtered or unexported fields
}

PooledReader wraps a file with pooled buffer

func (*PooledReader) Close

func (pr *PooledReader) Close() error

Close returns resources to pool

func (*PooledReader) Read

func (pr *PooledReader) Read(p []byte) (n int, err error)

Read implements io.Reader

type PutOperation

type PutOperation struct {
	Container string
	Artifact  string
	Data      io.Reader
}

PutOperation describes a parallel put

type PutOption

type PutOption func(*putOptions)

PutOption is a function that configures Put operations

func WithContentType

func WithContentType(ct string) PutOption

WithContentType sets the content type

func WithUserMetadata

func WithUserMetadata(meta map[string]string) PutOption

WithUserMetadata sets user metadata

type QuotalessDriver

type QuotalessDriver struct {
	*S3Driver
	// contains filtered or unexported fields
}

QuotalessDriver wraps S3Driver with Quotaless-specific configuration

func NewQuotalessDriver

func NewQuotalessDriver(accessKey, secretKey string, logger *zap.Logger) (*QuotalessDriver, error)

NewQuotalessDriver creates a Quotaless-specific driver

func (*QuotalessDriver) Delete

func (d *QuotalessDriver) Delete(ctx context.Context, container, artifact string) error

Delete overrides to add root path prefix

func (*QuotalessDriver) Get

func (d *QuotalessDriver) Get(ctx context.Context, container, artifact string) (io.ReadCloser, error)

Get overrides to add root path prefix

func (*QuotalessDriver) HealthCheck

func (d *QuotalessDriver) HealthCheck(ctx context.Context) error

HealthCheck verifies the backend is accessible

func (*QuotalessDriver) List

func (d *QuotalessDriver) List(ctx context.Context, container string, prefix string) ([]string, error)

List overrides to handle Quotaless-specific listing

func (*QuotalessDriver) Name

func (d *QuotalessDriver) Name() string

Name returns the driver name

func (*QuotalessDriver) Put

func (d *QuotalessDriver) Put(ctx context.Context, container, artifact string, data io.Reader, opts ...engine.PutOption) error

Put overrides to add root path prefix

type ReaderPool

type ReaderPool struct {
	// contains filtered or unexported fields
}

func NewReaderPool

func NewReaderPool(size int, factory func() (io.ReadCloser, error)) *ReaderPool

func (*ReaderPool) Get

func (p *ReaderPool) Get(ctx context.Context) (io.ReadCloser, error)

func (*ReaderPool) Put

func (p *ReaderPool) Put(reader io.ReadCloser)

type RequestQueue

type RequestQueue struct {
	// contains filtered or unexported fields
}

RequestQueue manages request processing with a worker pool

func NewRequestQueue

func NewRequestQueue(capacity, workers int, logger *zap.Logger) *RequestQueue

NewRequestQueue creates a new request queue

func (*RequestQueue) Close

func (q *RequestQueue) Close()

Close shuts down the queue gracefully

func (*RequestQueue) Submit

func (q *RequestQueue) Submit(ctx context.Context, priority int, fn func() error) error

Submit adds a job to the queue

type ResumableUpload

type ResumableUpload struct {
	// contains filtered or unexported fields
}

func NewResumableUpload

func NewResumableUpload(backend engine.Driver, logger *zap.Logger) *ResumableUpload

func (*ResumableUpload) CompleteUpload

func (r *ResumableUpload) CompleteUpload(ctx context.Context, uploadID string) error

func (*ResumableUpload) GetUploadOffset

func (r *ResumableUpload) GetUploadOffset(ctx context.Context, uploadID string) (int64, error)

func (*ResumableUpload) StartUpload

func (r *ResumableUpload) StartUpload(ctx context.Context, uploadID, container, artifact string, totalSize int64) error

func (*ResumableUpload) UploadChunk

func (r *ResumableUpload) UploadChunk(ctx context.Context, uploadID string, offset int64, data io.Reader) error

type RetryOption

type RetryOption func(*RetryPolicy)

RetryOption configures retry behavior

func WithInitialDelay

func WithInitialDelay(d time.Duration) RetryOption

WithInitialDelay sets the initial retry delay

func WithJitter

func WithJitter(enabled bool) RetryOption

WithJitter enables jitter to prevent thundering herd

func WithLogger

func WithLogger(logger *zap.Logger) RetryOption

WithLogger adds logging to retry attempts

func WithMaxAttempts

func WithMaxAttempts(n int) RetryOption

WithMaxAttempts sets maximum retry attempts

func WithMaxDelay

func WithMaxDelay(d time.Duration) RetryOption

WithMaxDelay sets the maximum retry delay

type RetryPolicy

type RetryPolicy struct {
	// contains filtered or unexported fields
}

RetryPolicy defines how to retry failed operations

func NewRetryPolicy

func NewRetryPolicy(opts ...RetryOption) *RetryPolicy

NewRetryPolicy creates a new retry policy

func (*RetryPolicy) Execute

func (p *RetryPolicy) Execute(ctx context.Context, fn func() error) error

Execute runs a function with retry logic

type RetryableDriver

type RetryableDriver struct {
	// contains filtered or unexported fields
}

RetryableDriver wraps a driver with retry logic

func NewRetryableDriver

func NewRetryableDriver(driver Driver, policy *RetryPolicy) *RetryableDriver

NewRetryableDriver creates a driver with retry capability

func (*RetryableDriver) Delete

func (r *RetryableDriver) Delete(ctx context.Context, container, artifact string) error

Delete with retry (idempotent)

func (*RetryableDriver) Exists

func (r *RetryableDriver) Exists(ctx context.Context, container, artifact string) (bool, error)

Exists with retry

func (*RetryableDriver) Get

func (r *RetryableDriver) Get(ctx context.Context, container, artifact string) (io.ReadCloser, error)

Get with retry

func (*RetryableDriver) List

func (r *RetryableDriver) List(ctx context.Context, container, prefix string) ([]string, error)

List with retry

func (*RetryableDriver) Put

func (r *RetryableDriver) Put(ctx context.Context, container, artifact string,
	data io.Reader, opts ...engine.PutOption) error

Put with retry (be careful - may not be idempotent!)

type S3CompatDriver

type S3CompatDriver struct {
	// contains filtered or unexported fields
}

S3CompatDriver implements storage.Backend for S3-compatible S3-compatible API

func NewS3CompatDriver

func NewS3CompatDriver(accessKey, secretKey string, logger *zap.Logger) (*S3CompatDriver, error)

NewS3CompatDriver creates a new S3-compatible storage driver

func (*S3CompatDriver) Delete

func (d *S3CompatDriver) Delete(ctx context.Context, container, artifact string) error

Delete removes an artifact

func (*S3CompatDriver) Get

func (d *S3CompatDriver) Get(ctx context.Context, container, artifact string) (io.ReadCloser, error)

Get retrieves an artifact

func (*S3CompatDriver) HealthCheck

func (d *S3CompatDriver) HealthCheck(ctx context.Context) error

HealthCheck verifies connectivity

func (*S3CompatDriver) List

func (d *S3CompatDriver) List(ctx context.Context, container, prefix string) ([]string, error)

List lists artifacts in a container

func (*S3CompatDriver) Name

func (d *S3CompatDriver) Name() string

Name returns the driver name

func (*S3CompatDriver) Put

func (d *S3CompatDriver) Put(ctx context.Context, container, artifact string, data io.Reader, opts ...engine.PutOption) error

Put stores an artifact

type S3Driver

type S3Driver struct {
	// contains filtered or unexported fields
}

S3Driver implements storage.Backend for S3-compatible storage

func NewS3Driver

func NewS3Driver(endpoint, accessKey, secretKey, region string, logger *zap.Logger) (*S3Driver, error)

NewS3Driver creates a new S3 storage driver

func (*S3Driver) CompleteMultipartUpload

func (d *S3Driver) CompleteMultipartUpload(ctx context.Context, upload *MultipartUpload, parts []CompletedPart) error

CompleteMultipartUpload finishes the S3 multipart upload

func (*S3Driver) CreateMultipartUpload

func (d *S3Driver) CreateMultipartUpload(ctx context.Context, container, artifact string) (*MultipartUpload, error)

CreateMultipartUpload starts a new S3 multipart upload

func (*S3Driver) Delete

func (d *S3Driver) Delete(ctx context.Context, container, artifact string) error

Delete removes an object from S3

func (*S3Driver) Get

func (d *S3Driver) Get(ctx context.Context, container, artifact string) (io.ReadCloser, error)

Get retrieves data from S3

func (*S3Driver) List

func (d *S3Driver) List(ctx context.Context, container, prefix string) ([]string, error)

List returns objects in a container with optional prefix

func (*S3Driver) Put

func (d *S3Driver) Put(ctx context.Context, container, artifact string, data io.Reader) error

Put stores data in S3

func (*S3Driver) UploadPart

func (d *S3Driver) UploadPart(ctx context.Context, upload *MultipartUpload, partNumber int, data io.Reader) (CompletedPart, error)

UploadPart uploads a part in S3 multipart upload

type S3MultipartUpload

type S3MultipartUpload struct {
	Bucket   string
	Key      string
	UploadID string
}

S3MultipartUpload represents an S3 multipart upload

type S3Signer

type S3Signer struct {
	AccessKey string
	SecretKey string
	Region    string
	Service   string
}

func NewS3Signer

func NewS3Signer(accessKey, secretKey, region string) *S3Signer

func (*S3Signer) GeneratePresignedURL

func (s *S3Signer) GeneratePresignedURL(method, bucket, key string, expires time.Duration) (string, error)

GeneratePresignedURL creates a properly signed presigned URL

func (*S3Signer) SignV4

func (s *S3Signer) SignV4(req *http.Request, payload []byte) error

SignV4 implements AWS Signature Version 4

type STSToken

type STSToken struct {
	AccessKey    string
	SecretKey    string
	SessionToken string
	Expiration   time.Time
}

type State

type State int

State represents the circuit breaker state

const (
	StateClosed   State = iota // Normal operation
	StateOpen                  // Failing, requests blocked
	StateHalfOpen              // Testing if service recovered
)

type Statement

type Statement struct {
	Effect    string                       `json:"Effect"`
	Action    StringOrSlice                `json:"Action"`
	Resource  StringOrSlice                `json:"Resource"`
	Condition map[string]map[string]string `json:"Condition,omitempty"`
}

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

func (*Stream) Release

func (s *Stream) Release()

type StreamManager

type StreamManager struct {
	// contains filtered or unexported fields
}

func NewStreamManager

func NewStreamManager(maxConcurrent int, logger *zap.Logger) *StreamManager

func (*StreamManager) AcquireStream

func (m *StreamManager) AcquireStream(ctx context.Context) (*Stream, error)

type StringOrSlice

type StringOrSlice []string

func (*StringOrSlice) UnmarshalJSON

func (s *StringOrSlice) UnmarshalJSON(data []byte) error

type ThrottledDriver

type ThrottledDriver struct {
	// contains filtered or unexported fields
}

func NewThrottledDriver

func NewThrottledDriver(backend engine.Driver, bytesPerSecond int, logger *zap.Logger) *ThrottledDriver

NewThrottledDriver creates a driver with bandwidth throttling

func (*ThrottledDriver) Delete

func (t *ThrottledDriver) Delete(ctx context.Context, container, artifact string) error

func (*ThrottledDriver) Get

func (t *ThrottledDriver) Get(ctx context.Context, container, artifact string) (io.ReadCloser, error)

Delegate other required methods

func (*ThrottledDriver) HealthCheck

func (t *ThrottledDriver) HealthCheck(ctx context.Context) error

func (*ThrottledDriver) List

func (t *ThrottledDriver) List(ctx context.Context, container, prefix string) ([]string, error)

func (*ThrottledDriver) Name

func (t *ThrottledDriver) Name() string

func (*ThrottledDriver) Put

func (t *ThrottledDriver) Put(ctx context.Context, container, artifact string,
	data io.Reader, opts ...engine.PutOption) error

type Transaction

type Transaction struct {
	// contains filtered or unexported fields
}

Transaction represents a batch of operations

func (*Transaction) Commit

func (t *Transaction) Commit() error

Commit executes all operations in the transaction

func (*Transaction) Put

func (t *Transaction) Put(ctx context.Context, container, artifact string, data io.Reader) error

Put adds a put operation to the transaction

func (*Transaction) Rollback

func (t *Transaction) Rollback() error

Rollback cancels the transaction

type UploadMetadata

type UploadMetadata struct {
	UploadID  string `json:"upload_id"`
	Container string `json:"container"`
	Artifact  string `json:"artifact"`
	TotalSize int64  `json:"total_size"`
	Uploaded  int64  `json:"uploaded"`
}

type Version

type Version struct {
	Path     string
	Modified time.Time
	Checksum string
}

type WASMPlugin

type WASMPlugin struct {
	// contains filtered or unexported fields
}

func LoadWASMPlugin

func LoadWASMPlugin(path string) (*WASMPlugin, error)

func (*WASMPlugin) Close

func (p *WASMPlugin) Close() error

func (*WASMPlugin) Transform

func (p *WASMPlugin) Transform(input []byte) ([]byte, error)

type WatchEvent

type WatchEvent struct {
	Type WatchEventType
	Path string // Relative path from watched root
}

WatchEvent represents a file system change event

type WatchEventType

type WatchEventType int

WatchEventType represents the type of file system event

const (
	WatchEventCreate WatchEventType = iota
	WatchEventModify
	WatchEventDelete
	WatchEventRename
)

type Watcher

type Watcher interface {
	Watch(ctx context.Context, prefix string) (<-chan WatchEvent, <-chan error, error)
}

Watcher interface for drivers that support file watching

type WebhookDispatcher

type WebhookDispatcher struct {
	// contains filtered or unexported fields
}

func NewWebhookDispatcher

func NewWebhookDispatcher() *WebhookDispatcher

func (*WebhookDispatcher) AddWebhook

func (w *WebhookDispatcher) AddWebhook(name, url string)

func (*WebhookDispatcher) Dispatch

func (w *WebhookDispatcher) Dispatch(ctx context.Context, event WatchEvent) error

Directories

Path Synopsis
plugins
transform command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL