storage

package
v0.1.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 28, 2025 License: MIT Imports: 22 Imported by: 0

README ΒΆ

Storage Extension - Production-Ready Implementation

A robust, production-ready object storage extension for Forge with comprehensive resilience, security, and observability features.

Features

πŸ”’ Security
  • Path Validation: Comprehensive path traversal protection
  • Input Sanitization: Automatic key sanitization
  • Secure Presigned URLs: Cryptographically signed URLs with expiration
  • Content Type Validation: Strict content type checking
  • Metadata Validation: Size and format constraints
πŸ›‘οΈ Resilience
  • Circuit Breaker Pattern: Prevent cascade failures with automatic recovery
  • Exponential Backoff Retry: Configurable retry logic with backoff
  • Rate Limiting: Token bucket algorithm for rate limiting
  • Timeout Management: Configurable operation timeouts
  • Non-Retryable Error Detection: Smart error classification
⚑ Performance
  • Buffer Pooling: sync.Pool for zero-allocation I/O
  • Concurrent Operations: Thread-safe file operations with fine-grained locking
  • ETag Caching: Cached MD5 checksums
  • Chunked Uploads: Large file support with multipart uploads
  • Connection Pooling: Optimized for cloud backends
πŸ“Š Observability
  • Comprehensive Metrics: Upload/download counts, durations, error rates
  • Health Checks: Multi-backend health monitoring
  • Structured Logging: Contextual logging with trace IDs
  • Circuit Breaker State Tracking: Real-time resilience monitoring
πŸ—„οΈ Backend Support
  • Enhanced Local: Production-ready local filesystem with atomic operations
  • AWS S3: Full S3 support with presigned URLs
  • GCS: Google Cloud Storage (coming soon)
  • Azure Blob: Azure Blob Storage (coming soon)

Installation

Basic Installation
cd v2
go get -u github.com/xraph/forge/extensions/storage
With AWS S3 Support
go get -u github.com/aws/aws-sdk-go-v2/aws
go get -u github.com/aws/aws-sdk-go-v2/config
go get -u github.com/aws/aws-sdk-go-v2/service/s3
go get -u github.com/aws/aws-sdk-go-v2/feature/s3/manager

Configuration

YAML Configuration
extensions:
  storage:
    default: "local"
    use_enhanced_backend: true
    
    # Backends configuration
    backends:
      local:
        type: "local"
        config:
          root_dir: "./storage"
          base_url: "http://localhost:8080/files"
          secret: "your-secret-key-here"
          chunk_size: 5242880  # 5MB
          max_upload_size: 5368709120  # 5GB
      
      s3:
        type: "s3"
        config:
          region: "us-east-1"
          bucket: "my-bucket"
          prefix: "uploads"
          access_key_id: "${AWS_ACCESS_KEY_ID}"
          secret_access_key: "${AWS_SECRET_ACCESS_KEY}"
    
    # Resilience configuration
    resilience:
      max_retries: 3
      initial_backoff: "100ms"
      max_backoff: "10s"
      backoff_multiplier: 2.0
      
      circuit_breaker_enabled: true
      circuit_breaker_threshold: 5
      circuit_breaker_timeout: "60s"
      circuit_breaker_half_open_max: 3
      
      rate_limit_enabled: true
      rate_limit_per_sec: 100
      rate_limit_burst: 200
      
      operation_timeout: "30s"
    
    # Features
    enable_presigned_urls: true
    presign_expiry: "15m"
    max_upload_size: 5368709120  # 5GB
    chunk_size: 5242880          # 5MB
    
    # CDN configuration
    enable_cdn: false
    cdn_base_url: ""
Programmatic Configuration
package main

import (
    "github.com/xraph/forge"
    "github.com/xraph/forge/extensions/storage"
)

func main() {
    // Create app
    app := forge.New()

    // Configure storage
    config := storage.DefaultConfig()
    config.Default = "s3"
    config.UseEnhancedBackend = true
    config.Backends["s3"] = storage.BackendConfig{
        Type: "s3",
        Config: map[string]interface{}{
            "region": "us-east-1",
            "bucket": "my-bucket",
        },
    }
    
    // Customize resilience
    config.Resilience.MaxRetries = 5
    config.Resilience.CircuitBreakerThreshold = 10

    // Register extension
    app.Use(storage.NewExtension(config))

    app.Run()
}

Usage

Basic Operations
// Get storage manager from DI
storageManager := forge.Must[*storage.StorageManager](app.Container(), "storage")

ctx := context.Background()

// Upload file
file, _ := os.Open("document.pdf")
defer file.Close()

err := storageManager.Upload(ctx, "documents/report.pdf", file,
    storage.WithContentType("application/pdf"),
    storage.WithMetadata(map[string]string{
        "user_id": "12345",
        "uploaded_by": "john@example.com",
    }),
)

// Download file
reader, err := storageManager.Download(ctx, "documents/report.pdf")
if err != nil {
    log.Fatal(err)
}
defer reader.Close()

// Copy data
io.Copy(os.Stdout, reader)

// Check if exists
exists, err := storageManager.Exists(ctx, "documents/report.pdf")

// Get metadata
metadata, err := storageManager.Metadata(ctx, "documents/report.pdf")
fmt.Printf("Size: %d, Last Modified: %v\n", metadata.Size, metadata.LastModified)

// List files
objects, err := storageManager.List(ctx, "documents/",
    storage.WithRecursive(true),
    storage.WithLimit(100),
)

// Delete file
err = storageManager.Delete(ctx, "documents/report.pdf")
Advanced Operations
// Copy file
err := storageManager.Copy(ctx, "source.pdf", "destination.pdf")

// Move file
err := storageManager.Move(ctx, "old-location.pdf", "new-location.pdf")

// Generate presigned upload URL
uploadURL, err := storageManager.PresignUpload(ctx, "uploads/file.pdf", 15*time.Minute)
// Share uploadURL with client for direct upload

// Generate presigned download URL
downloadURL, err := storageManager.PresignDownload(ctx, "downloads/file.pdf", 1*time.Hour)
// Share downloadURL with client for direct download
Health Checks
// Basic health check
err := storageManager.Health(ctx)
if err != nil {
    log.Printf("Storage unhealthy: %v", err)
}

// Detailed health check
health, err := storageManager.HealthDetailed(ctx, true) // Check all backends
fmt.Printf("Healthy: %v, %d/%d backends healthy\n",
    health.Healthy, health.HealthyCount, health.BackendCount)

for name, backend := range health.Backends {
    fmt.Printf("Backend %s: %v (response time: %v)\n",
        name, backend.Healthy, backend.ResponseTime)
}

// Check specific backend
backendHealth, err := storageManager.BackendHealth(ctx, "s3")
Using Specific Backend
// Get specific backend
s3Backend := storageManager.Backend("s3")
if s3Backend != nil {
    err := s3Backend.Upload(ctx, "key", data)
}

Architecture

Component Overview
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                    Storage Extension                     β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                    Storage Manager                       β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚
β”‚  β”‚              Health Checker                        β”‚ β”‚
β”‚  β”‚  β€’ Multi-backend health monitoring                 β”‚ β”‚
β”‚  β”‚  β€’ Write/List-based checks                         β”‚ β”‚
β”‚  β”‚  β€’ Concurrent health checks                        β”‚ β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                            β”‚
                            β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                  Resilience Layer                        β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚
β”‚  β”‚ Circuit Breaker β”‚ Rate Limiter β”‚ Retry w/ Backoff β”‚  β”‚
β”‚  β”‚ β€’ Open/Closed   β”‚ β€’ Token Bucketβ”‚ β€’ Exponential   β”‚  β”‚
β”‚  β”‚ β€’ Half-Open     β”‚ β€’ Burst Supportβ”‚ β€’ Timeout      β”‚  β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                            β”‚
                            β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                    Storage Backends                      β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”β”‚
β”‚  β”‚ Local Enhancedβ”‚    AWS S3    β”‚  GCS / Azure (Soon) β”‚β”‚
β”‚  β”‚ β€’ Atomic ops  β”‚ β€’ Presigned  β”‚  β€’ Coming soon      β”‚β”‚
β”‚  β”‚ β€’ File lockingβ”‚ β€’ Multipart  β”‚                     β”‚β”‚
β”‚  β”‚ β€’ Buffer pool β”‚ β€’ Pagination β”‚                     β”‚β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                            β”‚
                            β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                Cross-Cutting Concerns                    β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚
β”‚  β”‚ Path Validator β”‚ Buffer Pool  β”‚ Security         β”‚  β”‚
β”‚  β”‚ β€’ Traversal    β”‚ β€’ sync.Pool  β”‚ β€’ Validation     β”‚  β”‚
β”‚  β”‚ β€’ Sanitization β”‚ β€’ Zero-alloc β”‚ β€’ Encryption     β”‚  β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
Resilience Patterns
Circuit Breaker States
    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”
    β”‚ Closed  │──────► Failures β‰₯ Threshold
    β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”˜                 β”‚
         β”‚                      β–Ό
    Success             β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
         β”‚              β”‚   Open    β”‚
         β”‚              β””β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”˜
         β”‚                    β”‚
    β”Œβ”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”              β”‚ Timeout
    β”‚Half-Open β”‚β—„β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Metrics

The extension exposes the following metrics:

Operation Metrics
  • storage_uploads - Total upload count
  • storage_downloads - Total download count
  • storage_deletes - Total delete count
  • storage_upload_bytes - Total bytes uploaded
  • storage_upload_duration - Upload duration histogram
  • storage_download_duration - Download duration histogram
Error Metrics
  • storage_upload_errors - Upload error count
  • storage_download_errors - Download error count
  • storage_delete_errors - Delete error count
Resilience Metrics
  • storage_retries - Retry attempt count
  • storage_max_retries_exceeded - Max retries exceeded count
  • storage_circuit_breaker_open - Circuit breaker open events
  • storage_circuit_breaker_state - Current circuit breaker state (0=closed, 1=open, 2=half-open)
  • storage_rate_limit_allowed - Rate limit allowed requests
  • storage_rate_limit_rejected - Rate limit rejected requests
Health Metrics
  • storage_backend_health - Backend health status (1=healthy, 0=unhealthy)
  • storage_health_check_duration - Health check duration histogram

Testing

Run All Tests
cd v2/extensions/storage
go test -v
Run Integration Tests
go test -v -run 'TestStorageManager_.*'
Run Short Tests (Skip Integration)
go test -v -short
Run Benchmarks
go test -v -bench=. -benchmem
Coverage
go test -cover -coverprofile=coverage.out
go tool cover -html=coverage.out

Production Considerations

Local Backend
  • Use fast SSD storage for best performance
  • Configure regular backups
  • Monitor disk space
  • Use proper file permissions (0750 for directories, 0640 for files)
  • Generate and securely store the secret key
S3 Backend
  • Use IAM roles instead of access keys when possible
  • Enable versioning for important buckets
  • Configure lifecycle policies for automatic archival
  • Use VPC endpoints for better performance
  • Enable server-side encryption
  • Set appropriate CORS policies for presigned URLs
Resilience Tuning
  • Circuit Breaker Threshold: Higher for stable backends, lower for unstable
  • Retry Count: 3-5 retries for most cases
  • Backoff: Start at 100ms, max 10s
  • Rate Limit: Based on backend capacity and costs
  • Timeouts: 30s for normal ops, 5min for large uploads
Monitoring
  • Track error rates and set up alerts
  • Monitor circuit breaker state changes
  • Watch for rate limit rejections
  • Track P95/P99 latencies
  • Set up health check alerts

Security Best Practices

  1. Never commit secrets: Use environment variables or secret managers
  2. Rotate secrets regularly: Especially presigned URL secrets
  3. Validate all inputs: The extension validates, but add application-level checks
  4. Use HTTPS: Always use HTTPS for presigned URLs
  5. Set short expiry: Use minimum necessary expiry for presigned URLs
  6. Implement access control: Add application-level authorization
  7. Audit file access: Log all storage operations
  8. Scan uploaded files: Implement virus scanning for user uploads

Performance Tips

  1. Use buffer pooling: Reuse buffers for multiple operations
  2. Batch operations: Group related operations
  3. Use streaming: For large files, stream instead of loading into memory
  4. Enable CDN: For public assets, use CDN with presigned URLs
  5. Optimize chunks: Tune chunk size based on file sizes
  6. Parallel uploads: Use goroutines for multiple file uploads
  7. Cache metadata: Cache frequently accessed metadata

Troubleshooting

Circuit Breaker Keeps Opening
  • Check backend health and connectivity
  • Review timeout configuration
  • Examine error logs for root cause
  • Consider increasing threshold or timeout
High Latency
  • Check network latency to backend
  • Review buffer pool configuration
  • Monitor disk I/O (local backend)
  • Check for lock contention
Rate Limit Exceeded
  • Increase rate limit configuration
  • Implement client-side queuing
  • Use multiple backends for load distribution
Path Validation Errors
  • Review path validation rules
  • Use SanitizeKey() to clean paths
  • Check for special characters

Contributing

See CONTRIBUTING.md for development guidelines.

License

See LICENSE for license information.

Documentation ΒΆ

Overview ΒΆ

Package storage provides unified object storage with support for multiple backends including local filesystem, S3, GCS, and Azure Blob Storage.

Index ΒΆ

Constants ΒΆ

This section is empty.

Variables ΒΆ

View Source
var (
	// Configuration errors
	ErrNoBackendsConfigured   = errors.New("no storage backends configured")
	ErrNoDefaultBackend       = errors.New("no default backend specified")
	ErrDefaultBackendNotFound = errors.New("default backend not found in configuration")
	ErrInvalidBackendType     = errors.New("invalid backend type")
	ErrBackendNotFound        = errors.New("backend not found")

	// Operation errors
	ErrObjectNotFound        = errors.New("object not found")
	ErrObjectAlreadyExists   = errors.New("object already exists")
	ErrInvalidKey            = errors.New("invalid object key")
	ErrUploadFailed          = errors.New("upload failed")
	ErrDownloadFailed        = errors.New("download failed")
	ErrDeleteFailed          = errors.New("delete failed")
	ErrPresignNotSupported   = errors.New("presigned URLs not supported for this backend")
	ErrMultipartNotSupported = errors.New("multipart upload not supported for this backend")

	// Resilience errors
	ErrCircuitBreakerOpen = errors.New("circuit breaker is open")
	ErrRateLimitExceeded  = errors.New("rate limit exceeded")

	// Validation errors
	ErrFileTooLarge       = errors.New("file size exceeds maximum allowed size")
	ErrInvalidContentType = errors.New("invalid content type")
	ErrInvalidPath        = errors.New("invalid path: potential path traversal detected")
)

Functions ΒΆ

func IsValidSize ΒΆ

func IsValidSize(size int64, maxSize int64) bool

IsValidSize checks if a file size is within limits

func NewExtension ΒΆ

func NewExtension(config Config) forge.Extension

NewExtension creates a new storage extension

func ValidateMetadata ΒΆ

func ValidateMetadata(metadata map[string]string) error

ValidateMetadata validates metadata keys and values

Types ΒΆ

type BackendConfig ΒΆ

type BackendConfig struct {
	Type   string                 `yaml:"type" json:"type"` // local, s3, gcs, azure
	Config map[string]interface{} `yaml:"config" json:"config"`
}

BackendConfig is the configuration for a storage backend

type BackendHealth ΒΆ

type BackendHealth struct {
	Name         string        `json:"name"`
	Healthy      bool          `json:"healthy"`
	ResponseTime time.Duration `json:"response_time"`
	Error        string        `json:"error,omitempty"`
	LastChecked  time.Time     `json:"last_checked"`
	CheckType    string        `json:"check_type"`
}

BackendHealth represents health status of a backend

type BufferPool ΒΆ

type BufferPool struct {
	// contains filtered or unexported fields
}

BufferPool manages a pool of reusable buffers for efficient I/O operations

func NewBufferPool ΒΆ

func NewBufferPool(bufferSize int) *BufferPool

NewBufferPool creates a new buffer pool with the specified buffer size

func (*BufferPool) Get ΒΆ

func (bp *BufferPool) Get() []byte

Get retrieves a buffer from the pool

func (*BufferPool) Put ΒΆ

func (bp *BufferPool) Put(buf []byte)

Put returns a buffer to the pool

type BytesBufferPool ΒΆ

type BytesBufferPool struct {
	// contains filtered or unexported fields
}

BytesBufferPool manages a pool of bytes.Buffer for efficient operations

func NewBytesBufferPool ΒΆ

func NewBytesBufferPool() *BytesBufferPool

NewBytesBufferPool creates a new bytes buffer pool

func (*BytesBufferPool) Get ΒΆ

func (bbp *BytesBufferPool) Get() *bytes.Buffer

Get retrieves a buffer from the pool

func (*BytesBufferPool) Put ΒΆ

func (bbp *BytesBufferPool) Put(buf *bytes.Buffer)

Put returns a buffer to the pool

type CircuitBreaker ΒΆ

type CircuitBreaker struct {
	// contains filtered or unexported fields
}

CircuitBreaker implements circuit breaker pattern

func NewCircuitBreaker ΒΆ

func NewCircuitBreaker(config ResilienceConfig, logger forge.Logger, metrics forge.Metrics) *CircuitBreaker

NewCircuitBreaker creates a new circuit breaker

func (*CircuitBreaker) CanAttempt ΒΆ

func (cb *CircuitBreaker) CanAttempt() bool

CanAttempt checks if a request can be attempted

func (*CircuitBreaker) Execute ΒΆ

func (cb *CircuitBreaker) Execute(ctx context.Context, name string, fn func() error) error

Execute executes a function with circuit breaker protection

func (*CircuitBreaker) GetState ΒΆ

func (cb *CircuitBreaker) GetState() CircuitState

GetState returns current circuit state

func (*CircuitBreaker) RecordFailure ΒΆ

func (cb *CircuitBreaker) RecordFailure()

RecordFailure records a failed execution

func (*CircuitBreaker) RecordSuccess ΒΆ

func (cb *CircuitBreaker) RecordSuccess()

RecordSuccess records a successful execution

func (*CircuitBreaker) Reset ΒΆ

func (cb *CircuitBreaker) Reset()

Reset resets the circuit breaker

type CircuitState ΒΆ

type CircuitState int

CircuitState represents circuit breaker state

const (
	CircuitClosed CircuitState = iota
	CircuitOpen
	CircuitHalfOpen
)

func (CircuitState) String ΒΆ

func (s CircuitState) String() string

type Config ΒΆ

type Config struct {
	// Default backend name
	Default string `yaml:"default" json:"default" default:"local"`

	// Backend configurations
	Backends map[string]BackendConfig `yaml:"backends" json:"backends"`

	// Features
	EnablePresignedURLs bool          `yaml:"enable_presigned_urls" json:"enable_presigned_urls" default:"true"`
	PresignExpiry       time.Duration `yaml:"presign_expiry" json:"presign_expiry" default:"15m"`
	MaxUploadSize       int64         `yaml:"max_upload_size" json:"max_upload_size" default:"5368709120"` // 5GB
	ChunkSize           int           `yaml:"chunk_size" json:"chunk_size" default:"5242880"`              // 5MB

	// CDN
	EnableCDN  bool   `yaml:"enable_cdn" json:"enable_cdn" default:"false"`
	CDNBaseURL string `yaml:"cdn_base_url" json:"cdn_base_url"`

	// Resilience configuration
	Resilience ResilienceConfig `yaml:"resilience" json:"resilience"`

	// Use enhanced backend (with locking, pooling, etc.)
	UseEnhancedBackend bool `yaml:"use_enhanced_backend" json:"use_enhanced_backend" default:"true"`
}

Config is the storage extension configuration

func DefaultConfig ΒΆ

func DefaultConfig() Config

DefaultConfig returns the default storage configuration

func (*Config) Validate ΒΆ

func (c *Config) Validate() error

Validate validates the configuration

type EnhancedLocalBackend ΒΆ

type EnhancedLocalBackend struct {
	// contains filtered or unexported fields
}

EnhancedLocalBackend implements enhanced local filesystem storage with proper locking and pooling

func NewEnhancedLocalBackend ΒΆ

func NewEnhancedLocalBackend(config map[string]interface{}, logger forge.Logger, metrics forge.Metrics) (*EnhancedLocalBackend, error)

NewEnhancedLocalBackend creates a new enhanced local filesystem backend

func (*EnhancedLocalBackend) Copy ΒΆ

func (b *EnhancedLocalBackend) Copy(ctx context.Context, srcKey, dstKey string) error

Copy copies a file with proper locking

func (*EnhancedLocalBackend) Delete ΒΆ

func (b *EnhancedLocalBackend) Delete(ctx context.Context, key string) error

Delete deletes a file with proper locking

func (*EnhancedLocalBackend) Download ΒΆ

func (b *EnhancedLocalBackend) Download(ctx context.Context, key string) (io.ReadCloser, error)

Download downloads a file with proper locking

func (*EnhancedLocalBackend) Exists ΒΆ

func (b *EnhancedLocalBackend) Exists(ctx context.Context, key string) (bool, error)

Exists checks if an object exists

func (*EnhancedLocalBackend) List ΒΆ

func (b *EnhancedLocalBackend) List(ctx context.Context, prefix string, opts ...ListOption) ([]Object, error)

List lists files with a prefix

func (*EnhancedLocalBackend) Metadata ΒΆ

func (b *EnhancedLocalBackend) Metadata(ctx context.Context, key string) (*ObjectMetadata, error)

Metadata retrieves object metadata

func (*EnhancedLocalBackend) Move ΒΆ

func (b *EnhancedLocalBackend) Move(ctx context.Context, srcKey, dstKey string) error

Move moves a file with proper locking

func (*EnhancedLocalBackend) PresignDownload ΒΆ

func (b *EnhancedLocalBackend) PresignDownload(ctx context.Context, key string, expiry time.Duration) (string, error)

PresignDownload generates a presigned URL for download

func (*EnhancedLocalBackend) PresignUpload ΒΆ

func (b *EnhancedLocalBackend) PresignUpload(ctx context.Context, key string, expiry time.Duration) (string, error)

PresignUpload generates a presigned URL for upload

func (*EnhancedLocalBackend) Upload ΒΆ

func (b *EnhancedLocalBackend) Upload(ctx context.Context, key string, data io.Reader, opts ...UploadOption) error

Upload uploads a file with proper locking and validation

type EnhancedLocalConfig ΒΆ

type EnhancedLocalConfig struct {
	RootDir       string
	BaseURL       string
	Secret        string
	ChunkSize     int64
	MaxUploadSize int64
}

EnhancedLocalConfig contains configuration for enhanced local backend

type Extension ΒΆ

type Extension struct {
	// contains filtered or unexported fields
}

Extension implements the storage extension

func (*Extension) Dependencies ΒΆ

func (e *Extension) Dependencies() []string

Dependencies returns the list of extension dependencies

func (*Extension) Description ΒΆ

func (e *Extension) Description() string

Description returns the extension description

func (*Extension) Health ΒΆ

func (e *Extension) Health(ctx context.Context) error

Health checks the extension health

func (*Extension) Name ΒΆ

func (e *Extension) Name() string

Name returns the extension name

func (*Extension) Register ΒΆ

func (e *Extension) Register(app forge.App) error

Register registers the extension with the application

func (*Extension) Start ΒΆ

func (e *Extension) Start(ctx context.Context) error

Start starts the extension

func (*Extension) Stop ΒΆ

func (e *Extension) Stop(ctx context.Context) error

Stop stops the extension

func (*Extension) Version ΒΆ

func (e *Extension) Version() string

Version returns the extension version

type HealthCheckConfig ΒΆ

type HealthCheckConfig struct {
	Timeout       time.Duration `yaml:"timeout" json:"timeout" default:"5s"`
	WriteTestFile bool          `yaml:"write_test_file" json:"write_test_file" default:"true"`
	TestKey       string        `yaml:"test_key" json:"test_key" default:".health_check"`
	CheckAll      bool          `yaml:"check_all" json:"check_all" default:"false"`
	EnableMetrics bool          `yaml:"enable_metrics" json:"enable_metrics" default:"true"`
}

HealthCheckConfig configures health check behavior

func DefaultHealthCheckConfig ΒΆ

func DefaultHealthCheckConfig() HealthCheckConfig

DefaultHealthCheckConfig returns default health check configuration

type HealthChecker ΒΆ

type HealthChecker struct {
	// contains filtered or unexported fields
}

HealthChecker performs comprehensive health checks on storage backends

func NewHealthChecker ΒΆ

func NewHealthChecker(backends map[string]Storage, logger forge.Logger, metrics forge.Metrics, config HealthCheckConfig) *HealthChecker

NewHealthChecker creates a new health checker

func (*HealthChecker) CheckHealth ΒΆ

func (hc *HealthChecker) CheckHealth(ctx context.Context, defaultBackend string, checkAll bool) (*OverallHealth, error)

CheckHealth performs health check on default backend or all backends

func (*HealthChecker) GetBackendHealth ΒΆ

func (hc *HealthChecker) GetBackendHealth(ctx context.Context, name string) (*BackendHealth, error)

GetBackendHealth gets health status of a specific backend

type ListOption ΒΆ

type ListOption func(*ListOptions)

ListOption is a functional option for listing

func WithLimit ΒΆ

func WithLimit(limit int) ListOption

WithLimit sets the limit

func WithMarker ΒΆ

func WithMarker(marker string) ListOption

WithMarker sets the marker

func WithRecursive ΒΆ

func WithRecursive(recursive bool) ListOption

WithRecursive sets recursive listing

type ListOptions ΒΆ

type ListOptions struct {
	Limit     int
	Marker    string
	Recursive bool
}

ListOptions contains list options

type LocalBackend ΒΆ

type LocalBackend struct {
	// contains filtered or unexported fields
}

LocalBackend implements storage using local filesystem

func NewLocalBackend ΒΆ

func NewLocalBackend(config map[string]interface{}, logger forge.Logger, metrics forge.Metrics) (*LocalBackend, error)

NewLocalBackend creates a new local filesystem backend

func (*LocalBackend) Copy ΒΆ

func (b *LocalBackend) Copy(ctx context.Context, srcKey, dstKey string) error

Copy copies a file

func (*LocalBackend) Delete ΒΆ

func (b *LocalBackend) Delete(ctx context.Context, key string) error

Delete deletes a file

func (*LocalBackend) Download ΒΆ

func (b *LocalBackend) Download(ctx context.Context, key string) (io.ReadCloser, error)

Download downloads a file

func (*LocalBackend) Exists ΒΆ

func (b *LocalBackend) Exists(ctx context.Context, key string) (bool, error)

Exists checks if an object exists

func (*LocalBackend) List ΒΆ

func (b *LocalBackend) List(ctx context.Context, prefix string, opts ...ListOption) ([]Object, error)

List lists files with a prefix

func (*LocalBackend) Metadata ΒΆ

func (b *LocalBackend) Metadata(ctx context.Context, key string) (*ObjectMetadata, error)

Metadata retrieves object metadata

func (*LocalBackend) Move ΒΆ

func (b *LocalBackend) Move(ctx context.Context, srcKey, dstKey string) error

Move moves a file

func (*LocalBackend) PresignDownload ΒΆ

func (b *LocalBackend) PresignDownload(ctx context.Context, key string, expiry time.Duration) (string, error)

PresignDownload generates a presigned URL for download

func (*LocalBackend) PresignUpload ΒΆ

func (b *LocalBackend) PresignUpload(ctx context.Context, key string, expiry time.Duration) (string, error)

PresignUpload generates a presigned URL for upload

func (*LocalBackend) Upload ΒΆ

func (b *LocalBackend) Upload(ctx context.Context, key string, data io.Reader, opts ...UploadOption) error

Upload uploads a file

type Object ΒΆ

type Object struct {
	Key          string            `json:"key"`
	Size         int64             `json:"size"`
	LastModified time.Time         `json:"last_modified"`
	ETag         string            `json:"etag"`
	ContentType  string            `json:"content_type"`
	Metadata     map[string]string `json:"metadata"`
}

Object represents a storage object

type ObjectMetadata ΒΆ

type ObjectMetadata struct {
	Key          string            `json:"key"`
	Size         int64             `json:"size"`
	LastModified time.Time         `json:"last_modified"`
	ETag         string            `json:"etag"`
	ContentType  string            `json:"content_type"`
	Metadata     map[string]string `json:"metadata"`
}

ObjectMetadata represents object metadata

type OverallHealth ΒΆ

type OverallHealth struct {
	Healthy        bool                     `json:"healthy"`
	BackendCount   int                      `json:"backend_count"`
	HealthyCount   int                      `json:"healthy_count"`
	UnhealthyCount int                      `json:"unhealthy_count"`
	Backends       map[string]BackendHealth `json:"backends"`
	CheckedAt      time.Time                `json:"checked_at"`
}

OverallHealth represents overall storage health

type PathValidator ΒΆ

type PathValidator struct {
	// contains filtered or unexported fields
}

PathValidator validates storage paths for security

func NewPathValidator ΒΆ

func NewPathValidator() *PathValidator

NewPathValidator creates a new path validator with default rules

func (*PathValidator) SanitizeKey ΒΆ

func (pv *PathValidator) SanitizeKey(key string) string

SanitizeKey sanitizes a key to make it safe

func (*PathValidator) ValidateContentType ΒΆ

func (pv *PathValidator) ValidateContentType(contentType string) error

ValidateContentType validates content type

func (*PathValidator) ValidateKey ΒΆ

func (pv *PathValidator) ValidateKey(key string) error

ValidateKey validates a storage key for security issues

type RateLimiter ΒΆ

type RateLimiter struct {
	// contains filtered or unexported fields
}

RateLimiter implements token bucket rate limiting

func NewRateLimiter ΒΆ

func NewRateLimiter(config ResilienceConfig, metrics forge.Metrics) *RateLimiter

NewRateLimiter creates a new rate limiter

func (*RateLimiter) Allow ΒΆ

func (rl *RateLimiter) Allow() bool

Allow checks if a request is allowed

type ResilienceConfig ΒΆ

type ResilienceConfig struct {
	// Retry configuration
	MaxRetries        int           `yaml:"max_retries" json:"max_retries" default:"3"`
	InitialBackoff    time.Duration `yaml:"initial_backoff" json:"initial_backoff" default:"100ms"`
	MaxBackoff        time.Duration `yaml:"max_backoff" json:"max_backoff" default:"10s"`
	BackoffMultiplier float64       `yaml:"backoff_multiplier" json:"backoff_multiplier" default:"2.0"`

	// Circuit breaker configuration
	CircuitBreakerEnabled     bool          `yaml:"circuit_breaker_enabled" json:"circuit_breaker_enabled" default:"true"`
	CircuitBreakerThreshold   uint32        `yaml:"circuit_breaker_threshold" json:"circuit_breaker_threshold" default:"5"`
	CircuitBreakerTimeout     time.Duration `yaml:"circuit_breaker_timeout" json:"circuit_breaker_timeout" default:"60s"`
	CircuitBreakerHalfOpenMax uint32        `yaml:"circuit_breaker_half_open_max" json:"circuit_breaker_half_open_max" default:"3"`

	// Rate limiting
	RateLimitEnabled bool `yaml:"rate_limit_enabled" json:"rate_limit_enabled" default:"true"`
	RateLimitPerSec  int  `yaml:"rate_limit_per_sec" json:"rate_limit_per_sec" default:"100"`
	RateLimitBurst   int  `yaml:"rate_limit_burst" json:"rate_limit_burst" default:"200"`

	// Timeout configuration
	OperationTimeout time.Duration `yaml:"operation_timeout" json:"operation_timeout" default:"30s"`
}

ResilienceConfig configures resilience features

func DefaultResilienceConfig ΒΆ

func DefaultResilienceConfig() ResilienceConfig

DefaultResilienceConfig returns default resilience configuration

type ResilientStorage ΒΆ

type ResilientStorage struct {
	// contains filtered or unexported fields
}

ResilientStorage wraps a storage backend with resilience features

func NewResilientStorage ΒΆ

func NewResilientStorage(backend Storage, config ResilienceConfig, logger forge.Logger, metrics forge.Metrics) *ResilientStorage

NewResilientStorage creates a resilient storage wrapper

func (*ResilientStorage) Copy ΒΆ

func (rs *ResilientStorage) Copy(ctx context.Context, srcKey, dstKey string) error

Copy copies with resilience

func (*ResilientStorage) Delete ΒΆ

func (rs *ResilientStorage) Delete(ctx context.Context, key string) error

Delete deletes with resilience

func (*ResilientStorage) Download ΒΆ

func (rs *ResilientStorage) Download(ctx context.Context, key string) (io.ReadCloser, error)

Download downloads with resilience

func (*ResilientStorage) Exists ΒΆ

func (rs *ResilientStorage) Exists(ctx context.Context, key string) (bool, error)

Exists checks existence with resilience

func (*ResilientStorage) GetCircuitBreakerState ΒΆ

func (rs *ResilientStorage) GetCircuitBreakerState() CircuitState

GetCircuitBreakerState returns circuit breaker state

func (*ResilientStorage) List ΒΆ

func (rs *ResilientStorage) List(ctx context.Context, prefix string, opts ...ListOption) ([]Object, error)

List lists with resilience

func (*ResilientStorage) Metadata ΒΆ

func (rs *ResilientStorage) Metadata(ctx context.Context, key string) (*ObjectMetadata, error)

Metadata gets metadata with resilience

func (*ResilientStorage) Move ΒΆ

func (rs *ResilientStorage) Move(ctx context.Context, srcKey, dstKey string) error

Move moves with resilience

func (*ResilientStorage) PresignDownload ΒΆ

func (rs *ResilientStorage) PresignDownload(ctx context.Context, key string, expiry time.Duration) (string, error)

PresignDownload presigns download URL with resilience

func (*ResilientStorage) PresignUpload ΒΆ

func (rs *ResilientStorage) PresignUpload(ctx context.Context, key string, expiry time.Duration) (string, error)

PresignUpload presigns upload URL with resilience

func (*ResilientStorage) ResetCircuitBreaker ΒΆ

func (rs *ResilientStorage) ResetCircuitBreaker()

ResetCircuitBreaker resets the circuit breaker

func (*ResilientStorage) Upload ΒΆ

func (rs *ResilientStorage) Upload(ctx context.Context, key string, data io.Reader, opts ...UploadOption) error

Upload uploads with resilience

type S3Backend ΒΆ

type S3Backend struct {
	// contains filtered or unexported fields
}

S3Backend implements storage using AWS S3

func NewS3Backend ΒΆ

func NewS3Backend(configMap map[string]interface{}, logger forge.Logger, metrics forge.Metrics) (*S3Backend, error)

NewS3Backend creates a new S3 storage backend

func (*S3Backend) Copy ΒΆ

func (b *S3Backend) Copy(ctx context.Context, srcKey, dstKey string) error

Copy copies a file within S3

func (*S3Backend) Delete ΒΆ

func (b *S3Backend) Delete(ctx context.Context, key string) error

Delete deletes a file from S3

func (*S3Backend) Download ΒΆ

func (b *S3Backend) Download(ctx context.Context, key string) (io.ReadCloser, error)

Download downloads a file from S3

func (*S3Backend) Exists ΒΆ

func (b *S3Backend) Exists(ctx context.Context, key string) (bool, error)

Exists checks if an object exists

func (*S3Backend) List ΒΆ

func (b *S3Backend) List(ctx context.Context, prefix string, opts ...ListOption) ([]Object, error)

List lists files with a prefix

func (*S3Backend) Metadata ΒΆ

func (b *S3Backend) Metadata(ctx context.Context, key string) (*ObjectMetadata, error)

Metadata retrieves object metadata

func (*S3Backend) Move ΒΆ

func (b *S3Backend) Move(ctx context.Context, srcKey, dstKey string) error

Move moves a file within S3 (copy + delete)

func (*S3Backend) PresignDownload ΒΆ

func (b *S3Backend) PresignDownload(ctx context.Context, key string, expiry time.Duration) (string, error)

PresignDownload generates a presigned URL for download

func (*S3Backend) PresignUpload ΒΆ

func (b *S3Backend) PresignUpload(ctx context.Context, key string, expiry time.Duration) (string, error)

PresignUpload generates a presigned URL for upload

func (*S3Backend) Upload ΒΆ

func (b *S3Backend) Upload(ctx context.Context, key string, data io.Reader, opts ...UploadOption) error

Upload uploads a file to S3

type S3Config ΒΆ

type S3Config struct {
	Region          string
	Bucket          string
	Prefix          string
	AccessKeyID     string
	SecretAccessKey string
	SessionToken    string
	Endpoint        string // For S3-compatible services
	UsePathStyle    bool   // For S3-compatible services
}

S3Config contains S3 configuration

type Storage ΒΆ

type Storage interface {
	// Upload uploads an object
	Upload(ctx context.Context, key string, data io.Reader, opts ...UploadOption) error

	// Download downloads an object
	Download(ctx context.Context, key string) (io.ReadCloser, error)

	// Delete deletes an object
	Delete(ctx context.Context, key string) error

	// List lists objects with a prefix
	List(ctx context.Context, prefix string, opts ...ListOption) ([]Object, error)

	// Metadata retrieves object metadata
	Metadata(ctx context.Context, key string) (*ObjectMetadata, error)

	// Exists checks if an object exists
	Exists(ctx context.Context, key string) (bool, error)

	// Copy copies an object
	Copy(ctx context.Context, srcKey, dstKey string) error

	// Move moves an object
	Move(ctx context.Context, srcKey, dstKey string) error

	// PresignUpload generates a presigned URL for upload
	PresignUpload(ctx context.Context, key string, expiry time.Duration) (string, error)

	// PresignDownload generates a presigned URL for download
	PresignDownload(ctx context.Context, key string, expiry time.Duration) (string, error)
}

Storage defines the unified storage interface

type StorageManager ΒΆ

type StorageManager struct {
	// contains filtered or unexported fields
}

StorageManager manages multiple storage backends

func NewStorageManager ΒΆ

func NewStorageManager(config Config, logger forge.Logger, metrics forge.Metrics) *StorageManager

NewStorageManager creates a new storage manager

func (*StorageManager) Backend ΒΆ

func (m *StorageManager) Backend(name string) Storage

Backend returns a specific backend

func (*StorageManager) BackendHealth ΒΆ

func (m *StorageManager) BackendHealth(ctx context.Context, name string) (*BackendHealth, error)

BackendHealth returns health of a specific backend

func (*StorageManager) Copy ΒΆ

func (m *StorageManager) Copy(ctx context.Context, srcKey, dstKey string) error

Copy copies in the default backend

func (*StorageManager) Delete ΒΆ

func (m *StorageManager) Delete(ctx context.Context, key string) error

Delete deletes from the default backend

func (*StorageManager) Download ΒΆ

func (m *StorageManager) Download(ctx context.Context, key string) (io.ReadCloser, error)

Download downloads from the default backend

func (*StorageManager) Exists ΒΆ

func (m *StorageManager) Exists(ctx context.Context, key string) (bool, error)

Exists checks existence in the default backend

func (*StorageManager) GetURL ΒΆ

func (m *StorageManager) GetURL(ctx context.Context, key string) string

GetURL returns the URL for an object (CDN or direct)

func (*StorageManager) Health ΒΆ

func (m *StorageManager) Health(ctx context.Context) error

Health checks the health of all backends

func (*StorageManager) HealthDetailed ΒΆ

func (m *StorageManager) HealthDetailed(ctx context.Context, checkAll bool) (*OverallHealth, error)

HealthDetailed returns detailed health information

func (*StorageManager) List ΒΆ

func (m *StorageManager) List(ctx context.Context, prefix string, opts ...ListOption) ([]Object, error)

List lists from the default backend

func (*StorageManager) Metadata ΒΆ

func (m *StorageManager) Metadata(ctx context.Context, key string) (*ObjectMetadata, error)

Metadata gets metadata from the default backend

func (*StorageManager) Move ΒΆ

func (m *StorageManager) Move(ctx context.Context, srcKey, dstKey string) error

Move moves in the default backend

func (*StorageManager) PresignDownload ΒΆ

func (m *StorageManager) PresignDownload(ctx context.Context, key string, expiry time.Duration) (string, error)

PresignDownload generates a presigned download URL for the default backend

func (*StorageManager) PresignUpload ΒΆ

func (m *StorageManager) PresignUpload(ctx context.Context, key string, expiry time.Duration) (string, error)

PresignUpload generates a presigned upload URL for the default backend

func (*StorageManager) Start ΒΆ

func (m *StorageManager) Start(ctx context.Context) error

Start initializes all storage backends

func (*StorageManager) Stop ΒΆ

func (m *StorageManager) Stop(ctx context.Context) error

Stop closes all storage backends

func (*StorageManager) Upload ΒΆ

func (m *StorageManager) Upload(ctx context.Context, key string, data io.Reader, opts ...UploadOption) error

Upload uploads to the default backend

type UploadOption ΒΆ

type UploadOption func(*UploadOptions)

UploadOption is a functional option for uploads

func WithACL ΒΆ

func WithACL(acl string) UploadOption

WithACL sets ACL

func WithContentType ΒΆ

func WithContentType(contentType string) UploadOption

WithContentType sets the content type

func WithMetadata ΒΆ

func WithMetadata(metadata map[string]string) UploadOption

WithMetadata sets metadata

type UploadOptions ΒΆ

type UploadOptions struct {
	ContentType string
	Metadata    map[string]string
	ACL         string
}

UploadOptions contains upload options

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL