gothrottle

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 3, 2025 License: MIT Imports: 8 Imported by: 0

README

GoThrottle

GoThrottle Logo

Go Version Build Status Go Report Card Coverage Status GoDoc

A Go package for request throttling and rate limiting, heavily inspired by the Node.js bottleneck package.

Features

  • Local and Distributed Rate Limiting: Supports both in-memory (LocalStore) and Redis-based (RedisStore) backends
  • Configurable Limits: Set maximum concurrent jobs and minimum time between jobs
  • Priority Queue: Jobs are executed based on priority
  • Atomic Operations: Redis operations use Lua scripts to prevent race conditions
  • Easy Integration: Simple API for wrapping existing functions

Installation

go get github.com/AFZidan/gothrottle

Quick Start

Local Rate Limiting
package main

import (
    "fmt"
    "time"
    "github.com/AFZidan/gothrottle"
)

func main() {
    // Create a limiter with local storage
    limiter, err := gothrottle.NewLimiter(gothrottle.Options{
        MaxConcurrent: 2,                    // Max 2 concurrent jobs
        MinTime:       100 * time.Millisecond, // 100ms between jobs
    })
    if err != nil {
        panic(err)
    }
    defer limiter.Stop()

    // Schedule a job
    result, err := limiter.Schedule(func() (interface{}, error) {
        // Your work here
        return "Hello, World!", nil
    })
    
    fmt.Println(result) // "Hello, World!"
}
Distributed Rate Limiting with Redis
package main

import (
    "time"
    "github.com/AFZidan/gothrottle"
    "github.com/go-redis/redis/v8"
)

func main() {
    // Create Redis client
    rdb := redis.NewClient(&redis.Options{
        Addr: "localhost:6379",
    })

    // Create Redis store
    store, err := gothrottle.NewRedisStore(rdb)
    if err != nil {
        panic(err)
    }

    // Create limiter with Redis backend
    limiter, err := gothrottle.NewLimiter(gothrottle.Options{
        ID:            "my-distributed-limiter", // Required for Redis
        MaxConcurrent: 5,
        MinTime:       200 * time.Millisecond,
        Datastore:     store,
    })
    if err != nil {
        panic(err)
    }
    defer limiter.Stop()

    // This limiter will now coordinate with other instances
    // using the same Redis store and limiter ID
}

API Reference

Options
type Options struct {
    ID            string        // Unique ID for the limiter (required for Redis)
    MaxConcurrent int           // Maximum concurrent jobs (0 = unlimited)
    MinTime       time.Duration // Minimum time between jobs
    Datastore     Datastore     // Storage backend (nil = LocalStore)
}
Limiter Methods
NewLimiter(opts Options) (*Limiter, error)

Creates a new limiter instance.

Schedule(task func() (interface{}, error)) (interface{}, error)

Schedules a job with default priority (5) and weight (1). Blocks until completion.

ScheduleWithOptions(task func() (interface{}, error), priority, weight int) (interface{}, error)

Schedules a job with custom priority and weight. Higher priority jobs run first.

Wrap(fn func() (interface{}, error)) func() (interface{}, error)

Returns a wrapped version of the function that applies rate limiting.

Stop() error

Stops the limiter and cleans up resources.

Storage Backends
LocalStore

In-memory storage for single-instance applications. This is the default when no Datastore is specified.

store := gothrottle.NewLocalStore()
RedisStore

Redis-based storage for distributed rate limiting across multiple application instances.

rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
store, err := gothrottle.NewRedisStore(rdb)

Architecture

The package is built around a Datastore interface that allows pluggable storage backends:

type Datastore interface {
    Request(limiterID string, weight int, opts Options) (canRun bool, waitTime time.Duration, err error)
    RegisterDone(limiterID string, weight int) error
    Disconnect() error
}
  • LocalStore: Uses Go mutexes and in-memory state
  • RedisStore: Uses atomic Lua scripts for race-condition-free distributed coordination

Project Structure

gothrottle/
├── datastore.go         # Datastore interface definition
├── options.go          # Configuration options
├── job.go             # Job struct and priority queue
├── local_store.go     # In-memory storage implementation
├── redis_store.go     # Redis-based storage implementation
├── limiter.go         # Main Limiter struct and logic
├── errors.go          # Common error definitions
├── assets/            # Visual assets and branding
│   ├── logo.svg                 # Vector logo
│   ├── logo-*.png              # PNG logos (64px, 128px, 256px, 512px)
│   ├── social-preview.svg       # Social media preview (vector)
│   ├── social-preview-1280x640.png # GitHub social preview (PNG)
│   └── README.md               # Asset documentation
├── tests/             # Test files
│   ├── examples_test.go         # Basic usage examples
│   ├── limiter_test.go          # Core limiter unit tests
│   ├── integration_test.go      # Integration tests and benchmarks
│   ├── database_test.go         # Database throttling tests
│   └── advanced_database_test.go # Advanced DB operations with weights
├── .github/           # GitHub workflows and templates
│   ├── workflows/
│   │   ├── ci.yml                # CI/CD pipeline
│   │   ├── release.yml           # Release automation
│   │   └── codeql.yml           # Security analysis
│   ├── ISSUE_TEMPLATE/
│   │   ├── bug_report.md
│   │   ├── feature_request.md
│   │   └── documentation.md
│   └── pull_request_template.md
├── Makefile           # Development commands and workflows
├── go.mod             # Go module definition
├── go.sum             # Go module checksums
├── docker-compose.test.yml # Docker testing environment
├── Dockerfile.test    # Docker test container
├── README.md          # This file
├── CONTRIBUTING.md    # Contribution guidelines
├── CHANGELOG.md       # Version history
├── SECURITY.md        # Security policy
└── LICENSE            # MIT License

Examples

See tests/examples_test.go for more detailed examples of usage patterns.

Development

GoThrottle includes a comprehensive Makefile that provides all the common development commands. The Makefile offers a consistent and easy way to build, test, lint, and manage the project.

Getting Started
# Show all available commands
make help

# Install development tools (golangci-lint, gosec)
make install-tools

# Quick development workflow (format, vet, test)
make dev
Common Commands
# Build and Test
make build                 # Build the project
make test                  # Run tests
make test-race            # Run tests with race detector
make test-cover           # Run tests with coverage
make test-bench           # Run benchmarks
make test-all             # Run all tests (race, coverage, benchmarks)

# Code Quality
make fmt                  # Format code
make fmt-check           # Check if code is formatted
make vet                 # Run go vet
make lint                # Run golangci-lint
make security            # Run gosec security scan
make quality             # Run all quality checks

# Coverage
make coverage-html       # Generate HTML coverage report
make coverage-check      # Check coverage meets minimum threshold (60%)

# Dependencies
make deps                # Download dependencies
make verify              # Verify dependencies
make mod-tidy            # Tidy up go.mod and go.sum
make mod-update          # Update dependencies to latest versions

# Cross-platform builds
make cross-build         # Build for multiple platforms (Linux, macOS, Windows)

# CI Simulation
make ci                  # Simulate full CI pipeline locally
make release-check       # Full release readiness check
Quick Development Workflows
# Quick test cycle during development
make quick-test          # Format → Vet → Test

# Quick build cycle
make quick-build         # Format → Vet → Build

# Full quality gate (before committing)
make quality             # Format check → Vet → Lint → Security scan

# Full CI simulation (before pushing)
make ci                  # Dependencies → Quality → All tests → Cross-build
Coverage Requirements

The project maintains a minimum code coverage of 60%. You can check if your changes meet this requirement:

make coverage-check

This will run the tests with coverage and verify that the total coverage meets the minimum threshold.

Docker Testing

For testing with Redis in an isolated environment:

make docker-test         # Run tests in Docker with Redis
Watch Mode

For continuous testing during development (requires entr):

make watch-test          # Automatically run tests when files change
Manual Testing Commands

If you prefer to run commands manually without the Makefile:

# Run all tests
go test ./tests/... -v

# Run benchmarks
go test ./tests/... -bench=. -benchmem

# Test with coverage
go test -v -race -coverprofile=coverage.out -coverpkg=./... ./tests/...

# Test a specific function
go test ./tests/... -run TestLimiter_MaxConcurrent -v

License

MIT License - see LICENSE file for details.

Database Query Throttling

GoThrottle is excellent for throttling database operations to prevent overwhelming your database with too many concurrent queries. This is especially useful for:

  • Rate limiting API database calls
  • Batch processing large datasets
  • Preventing database connection pool exhaustion
  • Distributed rate limiting across multiple application instances
Basic Database Throttling
package main

import (
    "database/sql"
    "gothrottle"
    _ "github.com/lib/pq" // PostgreSQL driver
)

// DatabaseThrottler wraps database operations with rate limiting
type DatabaseThrottler struct {
    db      *sql.DB
    limiter *gothrottle.Limiter
}

func NewDatabaseThrottler(db *sql.DB, opts gothrottle.Options) (*DatabaseThrottler, error) {
    limiter, err := gothrottle.NewLimiter(opts)
    if err != nil {
        return nil, err
    }
    
    return &DatabaseThrottler{
        db:      db,
        limiter: limiter,
    }, nil
}

// Query executes a throttled database query
func (dt *DatabaseThrottler) Query(query string, args ...interface{}) (*sql.Rows, error) {
    result, err := dt.limiter.Schedule(func() (interface{}, error) {
        return dt.db.Query(query, args...)
    })
    
    if err != nil {
        return nil, err
    }
    
    return result.(*sql.Rows), nil
}

func main() {
    db, _ := sql.Open("postgres", "connection_string")
    
    // Limit to 5 concurrent queries with 10ms between query starts
    throttledDB, _ := NewDatabaseThrottler(db, gothrottle.Options{
        MaxConcurrent: 5,
        MinTime:       10 * time.Millisecond,
    })
    defer throttledDB.Close()
    
    // Now all queries through throttledDB will be rate limited
    rows, err := throttledDB.Query("SELECT * FROM users WHERE active = ?", true)
    // ... handle results
}
Weighted Database Operations

Different database operations can have different resource costs. You can assign weights:

// Light SELECT queries (weight 1)
rows, err := limiter.ScheduleWithOptions(func() (interface{}, error) {
    return db.Query("SELECT id FROM users")
}, 5, 1) // Priority 5, Weight 1

// Heavy analytical queries (weight 5)  
rows, err := limiter.ScheduleWithOptions(func() (interface{}, error) {
    return db.Query("SELECT COUNT(*) FROM large_table GROUP BY complex_column")
}, 10, 5) // Priority 10, Weight 5

// With MaxConcurrent: 10, you can run either:
// - 10 light queries simultaneously, OR  
// - 2 heavy queries simultaneously, OR
// - Some combination that doesn't exceed 10 total weight
Distributed Database Rate Limiting

For applications with multiple instances sharing the same database:

// Use Redis for distributed coordination
rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
store, _ := gothrottle.NewRedisStore(rdb)

// All app instances using this same ID will share the rate limits
throttledDB, _ := NewDatabaseThrottler(db, gothrottle.Options{
    ID:            "shared-db-limiter",
    MaxConcurrent: 20, // Total across ALL instances
    MinTime:       5 * time.Millisecond,
    Datastore:     store,
})

Real-World Use Cases & Examples

1. API Rate Limiting Middleware

Protect your API endpoints from being overwhelmed:

package main

import (
    "encoding/json"
    "fmt"
    "net/http"
    "time"
    
    "github.com/AFZidan/gothrottle"
    "github.com/go-redis/redis/v8"
)

// APIThrottler wraps HTTP handlers with rate limiting
type APIThrottler struct {
    limiter *gothrottle.Limiter
}

func NewAPIThrottler(opts gothrottle.Options) (*APIThrottler, error) {
    limiter, err := gothrottle.NewLimiter(opts)
    if err != nil {
        return nil, err
    }
    return &APIThrottler{limiter: limiter}, nil
}

// ThrottleHandler wraps an HTTP handler with rate limiting
func (at *APIThrottler) ThrottleHandler(handler http.HandlerFunc) http.HandlerFunc {
    return func(w http.ResponseWriter, r *http.Request) {
        _, err := at.limiter.Schedule(func() (interface{}, error) {
            handler(w, r)
            return nil, nil
        })
        
        if err != nil {
            http.Error(w, "Rate limit exceeded", http.StatusTooManyRequests)
            return
        }
    }
}

func main() {
    // Create distributed rate limiter for API
    rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
    store, _ := gothrottle.NewRedisStore(rdb)
    
    throttler, _ := NewAPIThrottler(gothrottle.Options{
        ID:            "api-rate-limiter",
        MaxConcurrent: 100,    // Max 100 concurrent API requests
        MinTime:       10 * time.Millisecond, // 10ms between requests
        Datastore:     store,
    })
    
    // Apply throttling to endpoints
    http.HandleFunc("/api/users", throttler.ThrottleHandler(handleUsers))
    http.HandleFunc("/api/orders", throttler.ThrottleHandler(handleOrders))
    
    http.ListenAndServe(":8080", nil)
}

func handleUsers(w http.ResponseWriter, r *http.Request) {
    // Simulate database query
    time.Sleep(50 * time.Millisecond)
    json.NewEncoder(w).Encode(map[string]string{"status": "success"})
}

func handleOrders(w http.ResponseWriter, r *http.Request) {
    // Simulate heavy database operation
    time.Sleep(200 * time.Millisecond)
    json.NewEncoder(w).Encode(map[string]string{"status": "success"})
}
2. File Processing Pipeline

Throttle file processing to prevent system overload:

package main

import (
    "fmt"
    "io/ioutil"
    "os"
    "path/filepath"
    "time"
    
    "github.com/AFZidan/gothrottle"
)

type FileProcessor struct {
    limiter *gothrottle.Limiter
}

func NewFileProcessor() *FileProcessor {
    limiter, _ := gothrottle.NewLimiter(gothrottle.Options{
        MaxConcurrent: 5,     // Process max 5 files concurrently
        MinTime:       100 * time.Millisecond, // 100ms between file processing
    })
    
    return &FileProcessor{limiter: limiter}
}

func (fp *FileProcessor) ProcessFile(filePath string) error {
    _, err := fp.limiter.ScheduleWithOptions(func() (interface{}, error) {
        // Determine file size for weight calculation
        stat, err := os.Stat(filePath)
        if err != nil {
            return nil, err
        }
        
        // Read and process file
        data, err := ioutil.ReadFile(filePath)
        if err != nil {
            return nil, err
        }
        
        // Simulate processing time based on file size
        processingTime := time.Duration(len(data)/1024) * time.Millisecond
        time.Sleep(processingTime)
        
        fmt.Printf("Processed file: %s (%d bytes)\n", filePath, len(data))
        return nil, nil
    }, 5, fp.getFileWeight(filePath)) // Priority 5, weight based on file size
    
    return err
}

func (fp *FileProcessor) getFileWeight(filePath string) int {
    stat, err := os.Stat(filePath)
    if err != nil {
        return 1
    }
    
    // Weight based on file size (MB)
    weight := int(stat.Size() / (1024 * 1024))
    if weight < 1 {
        weight = 1
    }
    if weight > 10 {
        weight = 10 // Cap at weight 10
    }
    
    return weight
}

func (fp *FileProcessor) ProcessDirectory(dirPath string) error {
    return filepath.Walk(dirPath, func(path string, info os.FileInfo, err error) error {
        if err != nil {
            return err
        }
        
        if !info.IsDir() {
            return fp.ProcessFile(path)
        }
        
        return nil
    })
}

func (fp *FileProcessor) Close() {
    fp.limiter.Stop()
}
3. Web Scraping with Rate Limits

Respectful web scraping that doesn't overwhelm target servers:

package main

import (
    "fmt"
    "io/ioutil"
    "net/http"
    "time"
    
    "github.com/AFZidan/gothrottle"
)

type WebScraper struct {
    limiter *gothrottle.Limiter
    client  *http.Client
}

func NewWebScraper() *WebScraper {
    // Respectful scraping limits
    limiter, _ := gothrottle.NewLimiter(gothrottle.Options{
        MaxConcurrent: 3,     // Max 3 concurrent requests
        MinTime:       2 * time.Second, // 2 seconds between requests
    })
    
    return &WebScraper{
        limiter: limiter,
        client:  &http.Client{Timeout: 30 * time.Second},
    }
}

func (ws *WebScraper) ScrapeURL(url string) (string, error) {
    result, err := ws.limiter.Schedule(func() (interface{}, error) {
        resp, err := ws.client.Get(url)
        if err != nil {
            return nil, err
        }
        defer resp.Body.Close()
        
        body, err := ioutil.ReadAll(resp.Body)
        if err != nil {
            return nil, err
        }
        
        fmt.Printf("Scraped: %s (%d bytes)\n", url, len(body))
        return string(body), nil
    })
    
    if err != nil {
        return "", err
    }
    
    return result.(string), nil
}

func (ws *WebScraper) ScrapeMultipleURLs(urls []string) []string {
    results := make([]string, len(urls))
    
    for i, url := range urls {
        content, err := ws.ScrapeURL(url)
        if err != nil {
            fmt.Printf("Error scraping %s: %v\n", url, err)
            continue
        }
        results[i] = content
    }
    
    return results
}

func (ws *WebScraper) Close() {
    ws.limiter.Stop()
}
4. Background Job Processing

Throttle background jobs to prevent resource exhaustion:

package main

import (
    "fmt"
    "sync"
    "time"
    
    "github.com/AFZidan/gothrottle"
)

type JobType int

const (
    EmailJob JobType = iota
    ReportJob
    DataSyncJob
    ImageProcessingJob
)

type Job struct {
    ID       string
    Type     JobType
    Data     interface{}
    Priority int
}

type JobProcessor struct {
    limiter *gothrottle.Limiter
}

func NewJobProcessor() *JobProcessor {
    limiter, _ := gothrottle.NewLimiter(gothrottle.Options{
        MaxConcurrent: 10,    // Max 10 concurrent jobs
        MinTime:       50 * time.Millisecond, // 50ms between job starts
    })
    
    return &JobProcessor{limiter: limiter}
}

func (jp *JobProcessor) ProcessJob(job Job) error {
    priority := job.Priority
    weight := jp.getJobWeight(job.Type)
    
    _, err := jp.limiter.ScheduleWithOptions(func() (interface{}, error) {
        return jp.executeJob(job)
    }, priority, weight)
    
    return err
}

func (jp *JobProcessor) getJobWeight(jobType JobType) int {
    switch jobType {
    case EmailJob:
        return 1 // Light operation
    case ReportJob:
        return 3 // Medium operation
    case DataSyncJob:
        return 5 // Heavy operation
    case ImageProcessingJob:
        return 8 // Very heavy operation
    default:
        return 1
    }
}

func (jp *JobProcessor) executeJob(job Job) (interface{}, error) {
    start := time.Now()
    
    switch job.Type {
    case EmailJob:
        return jp.processEmail(job)
    case ReportJob:
        return jp.generateReport(job)
    case DataSyncJob:
        return jp.syncData(job)
    case ImageProcessingJob:
        return jp.processImage(job)
    }
    
    fmt.Printf("Job %s completed in %v\n", job.ID, time.Since(start))
    return nil, nil
}

func (jp *JobProcessor) processEmail(job Job) (interface{}, error) {
    time.Sleep(100 * time.Millisecond) // Simulate email sending
    fmt.Printf("Email sent: %s\n", job.ID)
    return "email_sent", nil
}

func (jp *JobProcessor) generateReport(job Job) (interface{}, error) {
    time.Sleep(2 * time.Second) // Simulate report generation
    fmt.Printf("Report generated: %s\n", job.ID)
    return "report_generated", nil
}

func (jp *JobProcessor) syncData(job Job) (interface{}, error) {
    time.Sleep(5 * time.Second) // Simulate data sync
    fmt.Printf("Data synced: %s\n", job.ID)
    return "data_synced", nil
}

func (jp *JobProcessor) processImage(job Job) (interface{}, error) {
    time.Sleep(10 * time.Second) // Simulate image processing
    fmt.Printf("Image processed: %s\n", job.ID)
    return "image_processed", nil
}

func (jp *JobProcessor) ProcessJobsConcurrently(jobs []Job) {
    var wg sync.WaitGroup
    
    for _, job := range jobs {
        wg.Add(1)
        go func(j Job) {
            defer wg.Done()
            if err := jp.ProcessJob(j); err != nil {
                fmt.Printf("Job %s failed: %v\n", j.ID, err)
            }
        }(job)
    }
    
    wg.Wait()
}

func (jp *JobProcessor) Close() {
    jp.limiter.Stop()
}
5. Microservices Communication Throttling

Rate limit calls between microservices:

package main

import (
    "bytes"
    "encoding/json"
    "fmt"
    "net/http"
    "time"
    
    "github.com/AFZidan/gothrottle"
    "github.com/go-redis/redis/v8"
)

type ServiceClient struct {
    limiter    *gothrottle.Limiter
    baseURL    string
    httpClient *http.Client
}

func NewServiceClient(serviceName, baseURL string) *ServiceClient {
    // Use Redis for distributed rate limiting across service instances
    rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
    store, _ := gothrottle.NewRedisStore(rdb)
    
    limiter, _ := gothrottle.NewLimiter(gothrottle.Options{
        ID:            fmt.Sprintf("service-client-%s", serviceName),
        MaxConcurrent: 20,    // Max 20 concurrent calls to this service
        MinTime:       10 * time.Millisecond, // 10ms between calls
        Datastore:     store,
    })
    
    return &ServiceClient{
        limiter:    limiter,
        baseURL:    baseURL,
        httpClient: &http.Client{Timeout: 30 * time.Second},
    }
}

func (sc *ServiceClient) Get(endpoint string) (*http.Response, error) {
    result, err := sc.limiter.ScheduleWithOptions(func() (interface{}, error) {
        url := sc.baseURL + endpoint
        return sc.httpClient.Get(url)
    }, 5, 1) // Normal priority, weight 1
    
    if err != nil {
        return nil, err
    }
    
    return result.(*http.Response), nil
}

func (sc *ServiceClient) Post(endpoint string, data interface{}) (*http.Response, error) {
    result, err := sc.limiter.ScheduleWithOptions(func() (interface{}, error) {
        jsonData, err := json.Marshal(data)
        if err != nil {
            return nil, err
        }
        
        url := sc.baseURL + endpoint
        return sc.httpClient.Post(url, "application/json", bytes.NewBuffer(jsonData))
    }, 6, 2) // Higher priority, weight 2 (POST is heavier)
    
    if err != nil {
        return nil, err
    }
    
    return result.(*http.Response), nil
}

func (sc *ServiceClient) BulkOperation(endpoint string, items []interface{}) error {
    _, err := sc.limiter.ScheduleWithOptions(func() (interface{}, error) {
        // Bulk operations are heavy and should have high priority
        jsonData, err := json.Marshal(items)
        if err != nil {
            return nil, err
        }
        
        url := sc.baseURL + endpoint
        resp, err := sc.httpClient.Post(url, "application/json", bytes.NewBuffer(jsonData))
        if err != nil {
            return nil, err
        }
        defer resp.Body.Close()
        
        return resp, nil
    }, 10, 5) // Highest priority, weight 5 (very heavy operation)
    
    return err
}

func (sc *ServiceClient) Close() {
    sc.limiter.Stop()
}

// Example usage in a microservice
func main() {
    userService := NewServiceClient("user-service", "http://user-service:8080")
    orderService := NewServiceClient("order-service", "http://order-service:8080")
    
    defer userService.Close()
    defer orderService.Close()
    
    // These calls will be rate limited
    userResp, _ := userService.Get("/api/users/123")
    orderResp, _ := orderService.Post("/api/orders", map[string]interface{}{
        "user_id": 123,
        "amount":  99.99,
    })
    
    fmt.Printf("User response status: %d\n", userResp.StatusCode)
    fmt.Printf("Order response status: %d\n", orderResp.StatusCode)
}
6. ETL Pipeline Rate Limiting

Control data extraction, transformation, and loading processes:

package main

import (
    "database/sql"
    "fmt"
    "time"
    
    "github.com/AFZidan/gothrottle"
    _ "github.com/lib/pq"
)

type ETLPipeline struct {
    extractLimiter   *gothrottle.Limiter
    transformLimiter *gothrottle.Limiter
    loadLimiter      *gothrottle.Limiter
    sourceDB         *sql.DB
    targetDB         *sql.DB
}

func NewETLPipeline(sourceDB, targetDB *sql.DB) *ETLPipeline {
    // Different rate limits for different stages
    extractLimiter, _ := gothrottle.NewLimiter(gothrottle.Options{
        MaxConcurrent: 5,  // Limit source DB queries
        MinTime:       20 * time.Millisecond,
    })
    
    transformLimiter, _ := gothrottle.NewLimiter(gothrottle.Options{
        MaxConcurrent: 10, // CPU-intensive, but can be parallel
        MinTime:       10 * time.Millisecond,
    })
    
    loadLimiter, _ := gothrottle.NewLimiter(gothrottle.Options{
        MaxConcurrent: 3,  // Limit target DB writes
        MinTime:       50 * time.Millisecond,
    })
    
    return &ETLPipeline{
        extractLimiter:   extractLimiter,
        transformLimiter: transformLimiter,
        loadLimiter:      loadLimiter,
        sourceDB:         sourceDB,
        targetDB:         targetDB,
    }
}

func (etl *ETLPipeline) ExtractData(query string) ([]map[string]interface{}, error) {
    result, err := etl.extractLimiter.Schedule(func() (interface{}, error) {
        rows, err := etl.sourceDB.Query(query)
        if err != nil {
            return nil, err
        }
        defer rows.Close()
        
        // Process rows into data structure
        var data []map[string]interface{}
        // ... row processing logic
        
        fmt.Printf("Extracted %d records\n", len(data))
        return data, nil
    })
    
    if err != nil {
        return nil, err
    }
    
    return result.([]map[string]interface{}), nil
}

func (etl *ETLPipeline) TransformData(data []map[string]interface{}) ([]map[string]interface{}, error) {
    result, err := etl.transformLimiter.Schedule(func() (interface{}, error) {
        // Simulate data transformation
        time.Sleep(100 * time.Millisecond)
        
        var transformed []map[string]interface{}
        for _, record := range data {
            // Transform each record
            transformedRecord := make(map[string]interface{})
            for k, v := range record {
                transformedRecord[k+"_transformed"] = v
            }
            transformed = append(transformed, transformedRecord)
        }
        
        fmt.Printf("Transformed %d records\n", len(transformed))
        return transformed, nil
    })
    
    if err != nil {
        return nil, err
    }
    
    return result.([]map[string]interface{}), nil
}

func (etl *ETLPipeline) LoadData(data []map[string]interface{}) error {
    _, err := etl.loadLimiter.Schedule(func() (interface{}, error) {
        tx, err := etl.targetDB.Begin()
        if err != nil {
            return nil, err
        }
        defer tx.Rollback()
        
        for _, record := range data {
            // Insert transformed record
            // ... insert logic
        }
        
        err = tx.Commit()
        if err != nil {
            return nil, err
        }
        
        fmt.Printf("Loaded %d records\n", len(data))
        return nil, nil
    })
    
    return err
}

func (etl *ETLPipeline) ProcessBatch(query string) error {
    // Extract -> Transform -> Load pipeline
    data, err := etl.ExtractData(query)
    if err != nil {
        return err
    }
    
    transformedData, err := etl.TransformData(data)
    if err != nil {
        return err
    }
    
    return etl.LoadData(transformedData)
}

func (etl *ETLPipeline) Close() {
    etl.extractLimiter.Stop()
    etl.transformLimiter.Stop()
    etl.loadLimiter.Stop()
}

Documentation

Overview

FILENAME: datastore.go

FILENAME: errors.go

FILENAME: job.go

FILENAME: limiter.go

FILENAME: local_store.go

FILENAME: options.go

FILENAME: redis_store.go

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrStoreClosed is returned when attempting to use a closed store.
	ErrStoreClosed = errors.New("store is closed")

	// ErrMissingID is returned when a limiter ID is required but not provided.
	ErrMissingID = errors.New("limiter ID is required")

	// ErrInvalidWeight is returned when a job weight is invalid.
	ErrInvalidWeight = errors.New("job weight must be positive")
)

Functions

This section is empty.

Types

type Datastore

type Datastore interface {
	// Request checks if a job can run according to the limiter's rules.
	// It must return whether the job can run now, and if not, a suggested wait time.
	Request(limiterID string, weight int, opts Options) (canRun bool, waitTime time.Duration, err error)

	// RegisterDone informs the store that a job has finished.
	RegisterDone(limiterID string, weight int) error

	// Disconnect cleans up any connections.
	Disconnect() error
}

Datastore defines the interface for state management.

type Job

type Job struct {
	Task     func() (interface{}, error)
	Priority int
	Weight   int
	// contains filtered or unexported fields
}

Job represents a function to be executed by the Limiter.

type Limiter

type Limiter struct {
	// contains filtered or unexported fields
}

Limiter manages job scheduling and rate limiting.

func NewLimiter

func NewLimiter(opts Options) (*Limiter, error)

NewLimiter creates a new Limiter instance.

func (*Limiter) Schedule

func (l *Limiter) Schedule(task func() (interface{}, error)) (interface{}, error)

Schedule submits a job to be executed and blocks until completion.

func (*Limiter) ScheduleWithOptions

func (l *Limiter) ScheduleWithOptions(task func() (interface{}, error), priority, weight int) (interface{}, error)

ScheduleWithOptions submits a job with custom priority and weight.

func (*Limiter) Stop

func (l *Limiter) Stop() error

Stop stops the limiter and waits for all jobs to complete.

func (*Limiter) Wrap

func (l *Limiter) Wrap(fn func() (interface{}, error)) func() (interface{}, error)

Wrap creates a wrapper function that applies rate limiting to any function.

type LocalState

type LocalState struct {
	// contains filtered or unexported fields
}

LocalState holds the state for a single limiter.

type LocalStore

type LocalStore struct {
	// contains filtered or unexported fields
}

LocalStore is an in-memory implementation of Datastore.

func NewLocalStore

func NewLocalStore() *LocalStore

NewLocalStore creates a new LocalStore instance.

func (*LocalStore) Disconnect

func (ls *LocalStore) Disconnect() error

Disconnect cleans up any connections.

func (*LocalStore) RegisterDone

func (ls *LocalStore) RegisterDone(limiterID string, weight int) error

RegisterDone informs the store that a job has finished.

func (*LocalStore) Request

func (ls *LocalStore) Request(limiterID string, weight int, opts Options) (canRun bool, waitTime time.Duration, err error)

Request checks if a job can run according to the limiter's rules.

type Options

type Options struct {
	ID            string        // A unique ID for the limiter, required for Redis mode.
	MaxConcurrent int           // Max number of jobs running at once.
	MinTime       time.Duration // Minimum time between jobs.
	Datastore     Datastore     // Optional datastore for clustering. Defaults to local if nil.

}

Options holds the configuration for a Limiter.

type PriorityQueue

type PriorityQueue []*Job

PriorityQueue implements heap.Interface and holds Jobs.

func NewPriorityQueue

func NewPriorityQueue() *PriorityQueue

NewPriorityQueue creates a new priority queue.

func (*PriorityQueue) IsEmpty

func (pq *PriorityQueue) IsEmpty() bool

IsEmpty returns true if the queue is empty.

func (PriorityQueue) Len

func (pq PriorityQueue) Len() int

func (PriorityQueue) Less

func (pq PriorityQueue) Less(i, j int) bool

func (*PriorityQueue) Pop

func (pq *PriorityQueue) Pop() interface{}

func (*PriorityQueue) PopJob

func (pq *PriorityQueue) PopJob() *Job

PopJob removes and returns the highest priority job.

func (*PriorityQueue) Push

func (pq *PriorityQueue) Push(x interface{})

func (*PriorityQueue) PushJob

func (pq *PriorityQueue) PushJob(job *Job)

PushJob adds a job to the priority queue.

func (PriorityQueue) Swap

func (pq PriorityQueue) Swap(i, j int)

type RedisStore

type RedisStore struct {
	// contains filtered or unexported fields
}

RedisStore is a Redis-based implementation of Datastore.

func NewRedisStore

func NewRedisStore(client *redis.Client) (*RedisStore, error)

NewRedisStore creates a new RedisStore instance.

func (*RedisStore) Disconnect

func (rs *RedisStore) Disconnect() error

Disconnect cleans up any connections.

func (*RedisStore) RegisterDone

func (rs *RedisStore) RegisterDone(limiterID string, weight int) error

RegisterDone informs the store that a job has finished.

func (*RedisStore) Request

func (rs *RedisStore) Request(limiterID string, weight int, opts Options) (canRun bool, waitTime time.Duration, err error)

Request checks if a job can run according to the limiter's rules.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL