goroutine

package module
v1.0.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 13, 2026 License: MIT Imports: 15 Imported by: 0

README ΒΆ

Goroutine - Advanced Concurrent Processing for Go

Go Reference Go Report Card

A powerful Go library providing advanced concurrent processing utilities, including async task resolution, safe channel operations, parallel slice processing, and flexible goroutine management.

Features

πŸ—ΊοΈ SwissMap (High-Performance Concurrent Map) (NEW)
  • Thread-safe generic map with sharded architecture
  • 5-10x faster than sync.Map in high-concurrency scenarios
  • Reduced lock contention with configurable shard count
  • Rich API: GetOrSet, GetOrCompute, Range, and more
  • Automatic integration with Cache for improved performance
πŸš€ Async Resolve (Promise-like Pattern)
  • Launch multiple async operations simultaneously
  • Wait for all operations to complete (similar to Promise.all())
  • Timeout support for time-bounded operations
  • Type-agnostic result collection
πŸ”’ SafeChannel
  • Thread-safe channel wrapper with timeout capabilities
  • Distributed backend support with multiple backend strategies
  • Error handling for closed channels and timeouts
  • Context-aware operations
  • Cache preflight support to reduce downstream load
⚑ SuperSlice (Parallel Slice Processing)
  • Automatic parallelization based on configurable thresholds
  • Worker pool management for efficient processing
  • In-place updates to save memory
  • Support for map, filter, forEach operations with type transformations
  • Error handling in processing callbacks
🎯 GoManager (Goroutine Management)
  • Named goroutine management with cancellation support
  • Context-based lifecycle management
  • Dynamic goroutine launching and cancellation
πŸ”„ Type Recasting
  • Flexible type conversion utilities
  • Safe type transformations
πŸ’Ύ Cache & Preflight
  • Sync preflight pattern to reduce downstream load
  • Cache-first fetching from databases and APIs
  • Stale-while-revalidate for optimal user experience
  • NoCache mode for fresh data when needed
  • Configurable TTL and cache control directives
  • Now powered by SwissMap for improved concurrent performance
πŸ”€ Concurrency Patterns (NEW)
  • Pipeline: Composable multi-stage data processing
  • Fan-Out/Fan-In: Distribute work across workers and aggregate results
  • Rate Limiter: Control operation execution rate
  • Semaphore: Resource access control with permits
  • Generator: Lazy value production with context support
  • Circuit Breaker: Fault tolerance and failure handling
🎁 Smart Concurrency Wrapper (NEW)
  • Unified API for concurrency control
  • Automatic retry with configurable delays
  • Timeout management
  • Rate limiting integration
  • Circuit breaker for fault tolerance
  • Composable with other patterns
🚩 Feature Flags (NEW)
  • Easy-to-use feature flag management with Redis backend
  • Environment-aware flag evaluation (prod, stage, dev)
  • Rollout policies: Gradual, canary, targeted, and all-at-once deployments
  • Percentage-based rollouts with consistent hashing
  • User segment targeting for canary releases
  • Targeted rollouts for specific user IDs
  • Global and environment-specific overrides
  • Local caching for fast reads
  • Safe defaults (non-existent flags default to disabled)
  • Simple API for common operations

Installation

go get github.com/ivikasavnish/goroutine

Quick Start

SwissMap - High-Performance Concurrent Map

Thread-safe map with excellent concurrent performance:

package main

import (
    "fmt"
    "github.com/ivikasavnish/goroutine"
)

func main() {
    // Create a high-performance concurrent map
    sm := goroutine.NewSwissMap[string, int]()
    
    // Set values (thread-safe)
    sm.Set("apple", 5)
    sm.Set("banana", 3)
    sm.Set("orange", 7)
    
    // Get values
    if val, ok := sm.Get("apple"); ok {
        fmt.Printf("apple: %d\n", val) // apple: 5
    }
    
    // GetOrCompute - compute value only if not present
    result := sm.GetOrCompute("grape", func() int {
        return 10 // Only computed if "grape" doesn't exist
    })
    fmt.Printf("grape: %d\n", result)
    
    // Safe concurrent access from multiple goroutines
    // No locks needed - SwissMap handles it all!
}
Async Resolve

Launch multiple async operations and wait for completion:

package main

import (
    "fmt"
    "time"
    "github.com/ivikasavnish/goroutine"
)

func main() {
    group := goroutine.NewGroup()
    
    var result1, result2, result3 any
    
    // Launch async operations
    group.Assign(&result1, func() any {
        time.Sleep(500 * time.Millisecond)
        return "Task 1 completed"
    })
    
    group.Assign(&result2, func() any {
        time.Sleep(300 * time.Millisecond)
        return 42
    })
    
    group.Assign(&result3, func() any {
        time.Sleep(200 * time.Millisecond)
        return true
    })
    
    // Wait for all tasks to complete
    group.Resolve()
    
    fmt.Printf("Result 1: %v\n", result1) // "Task 1 completed"
    fmt.Printf("Result 2: %v\n", result2) // 42
    fmt.Printf("Result 3: %v\n", result3) // true
}
SuperSlice - Parallel Slice Processing

Process large slices efficiently with automatic parallelization:

package main

import (
    "fmt"
    "github.com/ivikasavnish/goroutine"
)

func main() {
    // Create a large slice
    numbers := make([]int, 10000)
    for i := range numbers {
        numbers[i] = i + 1
    }
    
    // Process with automatic parallelization
    ss := goroutine.NewSuperSlice(numbers)
    result := ss.Process(func(index int, item int) int {
        return item * 2
    })
    
    fmt.Printf("Processed %d items\n", len(result))
}
SuperSlice - Custom Configuration

Configure threshold and worker count:

package main

import (
    "fmt"
    "github.com/ivikasavnish/goroutine"
)

func main() {
    data := []int{1, 2, 3, 4, 5}
    
    // Use fluent API for configuration
    result := goroutine.NewSuperSlice(data).
        WithThreshold(500).      // Switch to parallel at 500 items
        WithWorkers(8).          // Use 8 worker goroutines
        WithIterable().          // Use iterable processing mode
        Process(func(index int, item int) int {
            return item * 10
        })
    
    fmt.Println(result) // [10 20 30 40 50]
}
SafeChannel

Thread-safe channel operations with timeout support:

package main

import (
    "fmt"
    "time"
    "github.com/ivikasavnish/goroutine"
)

func main() {
    // Create a safe channel with buffer size 10 and 5-second timeout
    sc := goroutine.NewSafeChannel[int](10, 5*time.Second)
    
    // Send with timeout
    err := sc.SendWithTimeout(42, 1*time.Second)
    if err != nil {
        fmt.Printf("Send error: %v\n", err)
        return
    }
    
    // Receive with timeout
    value, err := sc.ReceiveWithTimeout(1 * time.Second)
    if err != nil {
        fmt.Printf("Receive error: %v\n", err)
        return
    }
    
    fmt.Printf("Received: %d\n", value) // 42
    
    // Close when done
    sc.Close()
}
GoManager

Manage named goroutines with cancellation:

package main

import (
    "fmt"
    "time"
    "github.com/ivikasavnish/goroutine"
)

func main() {
    manager := goroutine.NewGoManager()
    
    // Launch a named goroutine
    manager.GO("worker1", func() {
        for i := 0; i < 10; i++ {
            fmt.Printf("Worker 1: %d\n", i)
            time.Sleep(500 * time.Millisecond)
        }
    })
    
    // Let it run for 2 seconds
    time.Sleep(2 * time.Second)
    
    // Cancel the goroutine
    manager.Cancel("worker1")
    fmt.Println("Worker cancelled")
}

Use ParametricFetcher for automatic key generation from parameters - no manual key construction needed:

package main

import (
    "context"
    "fmt"
    "time"
    "github.com/ivikasavnish/goroutine"
)

func main() {
    // Fetch function takes user ID directly - no manual keys!
    fetchUser := func(ctx context.Context, userID int) (string, error) {
        time.Sleep(500 * time.Millisecond) // Simulate DB latency
        return fmt.Sprintf("User-%d-Data", userID), nil
    }
    
    // Automatic key generation using FormatKeyFunc
    keyFunc := goroutine.FormatKeyFunc[int]("user:%d")
    control := &goroutine.CacheControl{
        NoCache: false,
        MaxAge:  1 * time.Minute,
    }
    
    fetcher := goroutine.NewParametricFetcher(fetchUser, keyFunc, control)
    ctx := context.Background()
    
    // Just pass the user ID - key is generated automatically!
    user1, _ := fetcher.Fetch(ctx, 123) // DB call
    fmt.Println(user1) // User-123-Data
    
    // Same parameter = automatic cache hit
    user2, _ := fetcher.Fetch(ctx, 123) // Cache hit!
    fmt.Println(user2) // User-123-Data (instant)
    
    // Different parameter = new cache entry
    user3, _ := fetcher.Fetch(ctx, 456) // DB call for new user
    fmt.Println(user3) // User-456-Data
}
Cache & Preflight (Alternative: PreflightFetcher)

Reduce downstream load on databases and APIs with cache preflight and manual keys:

package main

import (
    "context"
    "fmt"
    "time"
    "github.com/ivikasavnish/goroutine"
)

func main() {
    // Create a preflight fetcher with cache-first pattern
    dbCallCount := 0
    
    fetchFunc := func(ctx context.Context, key string) (string, error) {
        dbCallCount++
        fmt.Printf("DB Call #%d\n", dbCallCount)
        time.Sleep(500 * time.Millisecond) // Simulate DB latency
        return fmt.Sprintf("Data-%s", key), nil
    }
    
    control := &goroutine.CacheControl{
        NoCache: false,
        MaxAge:  1 * time.Minute,
    }
    
    fetcher := goroutine.NewPreflightFetcher(fetchFunc, control)
    ctx := context.Background()
    
    // First fetch - hits database
    val1, _ := fetcher.Fetch(ctx, "user:123")
    fmt.Println(val1) // DB Call #1, Data-user:123
    
    // Second fetch - uses cache (preflight check)
    val2, _ := fetcher.Fetch(ctx, "user:123")
    fmt.Println(val2) // No DB call! Data-user:123
    
    // Third fetch - still cached
    val3, _ := fetcher.Fetch(ctx, "user:123")
    fmt.Println(val3) // No DB call! Data-user:123
    
    fmt.Printf("Total DB calls: %d (saved 2 calls)\n", dbCallCount)
    // Output: Total DB calls: 1 (saved 2 calls)
}
Pipeline Pattern

Create composable data processing pipelines:

package main

import (
    "context"
    "fmt"
    "github.com/ivikasavnish/goroutine"
)

func main() {
    // Create a pipeline with multiple stages
    pipeline := goroutine.NewPipeline[int]().
        AddStage(func(n int) int { return n * 2 }).
        AddStage(func(n int) int { return n + 10 }).
        AddStage(func(n int) int { return n * n })
    
    // Process single item
    result := pipeline.Execute(5) // ((5*2)+10)^2 = 400
    fmt.Printf("Result: %d\n", result)
    
    // Process multiple items asynchronously
    items := []int{1, 2, 3, 4, 5}
    results := pipeline.ExecuteAsync(context.Background(), items)
    fmt.Printf("Results: %v\n", results)
}
Fan-Out/Fan-In Pattern

Distribute work across multiple workers:

package main

import (
    "context"
    "fmt"
    "github.com/ivikasavnish/goroutine"
)

func main() {
    // Create fan-out with 3 workers
    fanOut := goroutine.NewFanOut[int, int](3)
    
    items := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
    results := fanOut.ProcessWithIndex(context.Background(), items, 
        func(idx int, n int) int {
            return n * n
        })
    
    fmt.Printf("Results: %v\n", results)
}
CachedGroup with Preflight

Use CachedGroup for async operations with automatic caching:

package main

import (
    "fmt"
    "time"
    "github.com/ivikasavnish/goroutine"
)

func main() {
    cg := goroutine.NewCachedGroup()
    
    control := &goroutine.CacheControl{
        NoCache: false,
        MaxAge:  5 * time.Minute,
    }
    
    var result1, result2 any
    
    // First call - fetches from DB
    cg.AssignWithCache("user:123", &result1, func() any {
        time.Sleep(500 * time.Millisecond)
        return "DB Data for user:123"
    }, control)
    cg.Resolve()
    fmt.Println(result1) // DB Data for user:123 (took ~500ms)
    
    // Second call - uses cache (instant)
    cg.AssignWithCache("user:123", &result2, func() any {
        time.Sleep(500 * time.Millisecond)
        return "DB Data for user:123"
    }, control)
    cg.Resolve()
    fmt.Println(result2) // DB Data for user:123 (took <1Β΅s)
}
Stale-While-Revalidate Pattern

Return stale data immediately while revalidating in background:

package main

import (
    "context"
    "fmt"
    "time"
    "github.com/ivikasavnish/goroutine"
)

func main() {
    fetchFunc := func(ctx context.Context, key string) (string, error) {
        time.Sleep(500 * time.Millisecond)
        return fmt.Sprintf("Fresh-%d", time.Now().Unix()), nil
    }
    
    control := &goroutine.CacheControl{
        NoCache:              false,
        MaxAge:               100 * time.Millisecond,
        StaleWhileRevalidate: true,
    }
    
    fetcher := goroutine.NewPreflightFetcher(fetchFunc, control)
    ctx := context.Background()
    
    // Initial fetch
    val1, _ := fetcher.Fetch(ctx, "key1")
    fmt.Println(val1) // Fresh-1234567890 (took ~500ms)
    
    // Wait for cache to expire
    time.Sleep(200 * time.Millisecond)
    
    // Fetch with stale data - returns immediately!
    val2, _ := fetcher.FetchStaleWhileRevalidate(ctx, "key1")
    fmt.Println(val2) // Fresh-1234567890 (took <1Β΅s, stale but instant!)
    // Background revalidation happens automatically
}
Smart Concurrency Wrapper

Unified API with retry, timeout, rate limiting, and circuit breaker:

package main

import (
    "context"
    "fmt"
    "time"
    "github.com/ivikasavnish/goroutine"
)

func main() {
    // Configure wrapper
    config := &goroutine.ConcurrencyConfig{
        MaxConcurrency:          3,
        Timeout:                 5 * time.Second,
        RateLimit:               10,
        RetryAttempts:           2,
        RetryDelay:              100 * time.Millisecond,
        EnableCircuitBreaker:    true,
        CircuitBreakerThreshold: 5,
    }
    
    wrapper := goroutine.NewConcurrencyWrapper[int, int](config)
    defer wrapper.Close()
    
    items := []int{1, 2, 3, 4, 5}
    results, err := wrapper.Process(context.Background(), items, 
        func(n int) (int, error) {
            // Your processing logic here
            return n * 2, nil
        })
    
    if err != nil {
        fmt.Printf("Error: %v\n", err)
        return
    }
    
    fmt.Printf("Results: %v\n", results)
}
Worker Pool with Task Scheduling

Execute immediate tasks, delayed tasks, and cron jobs:

package main

import (
    "context"
    "fmt"
    "time"
    "github.com/ivikasavnish/goroutine"
)

func main() {
    // Create a worker pool with 5 workers
    pool := goroutine.NewWorkerPool(5)
    pool.Start()
    defer pool.Stop()
    
    // Submit immediate task
    task1 := &goroutine.WorkerTask{
        ID: "urgent-task",
        Func: func(ctx context.Context) error {
            fmt.Println("Processing urgent request...")
            return nil
        },
    }
    pool.Submit(task1)
    
    // Submit delayed task (runs after 2 seconds)
    task2 := &goroutine.WorkerTask{
        ID: "email-task",
        Func: func(ctx context.Context) error {
            fmt.Println("Sending scheduled email...")
            return nil
        },
    }
    pool.SubmitDelayed(task2, 2*time.Second)
    
    // Submit cron job (runs every 30 seconds)
    task3 := &goroutine.WorkerTask{
        ID: "cleanup-job",
        Func: func(ctx context.Context) error {
            fmt.Println("Running cleanup...")
            return nil
        },
    }
    pool.SubmitCron(task3, "@every 30s")
    
    // Let tasks run
    time.Sleep(5 * time.Second)
    
    // Cancel cron job
    pool.CancelCron("cleanup-job")
}
Worker Pool with Resque/Celery Mode

Resque mode with retry and dead letter queue:

package main

import (
    "context"
    "fmt"
    "time"
    "github.com/ivikasavnish/goroutine"
)

func main() {
    // Configure worker pool with Resque mode features
    config := goroutine.DefaultWorkerPoolConfig()
    config.NumWorkers = 5
    config.EnableResults = true
    config.MaxDeadLetter = 1000
    
    // Add lifecycle hooks
    config.OnTaskFailed = func(task *goroutine.WorkerTask, err error) {
        fmt.Printf("Task %s failed: %v\n", task.ID, err)
    }
    
    pool := goroutine.NewWorkerPoolWithConfig(config)
    pool.Start()
    defer pool.Stop()
    
    // Submit task with retry policy (Resque mode)
    task := &goroutine.WorkerTask{
        ID: "flaky-api-call",
        Func: func(ctx context.Context) error {
            // Simulate flaky API
            return callExternalAPI()
        },
        RetryPolicy: &goroutine.RetryPolicy{
            MaxRetries:    3,
            RetryDelay:    time.Second,
            BackoffFactor: 2.0,        // Exponential backoff
            MaxRetryDelay: 30 * time.Second,
        },
        Priority: goroutine.PriorityHigh,
        Timeout:  10 * time.Second,
    }
    
    pool.Submit(task)
    
    // Check result
    time.Sleep(5 * time.Second)
    result, _ := pool.GetResult("flaky-api-call")
    fmt.Printf("Status: %s, Attempts: %d\n", result.Status, result.Attempts)
    
    // Check dead letter queue for failed tasks
    deadTasks := pool.GetDeadLetterTasks()
    for _, task := range deadTasks {
        fmt.Printf("Failed task: %s\n", task.ID)
        // Optionally requeue after fixing issue
        pool.RequeueDeadLetter(task.ID)
    }
}

Celery mode with named queues:

package main

import (
    "context"
    "fmt"
    "time"
    "github.com/ivikasavnish/goroutine"
)

func main() {
    // Configure worker pool with Celery mode features
    config := goroutine.DefaultWorkerPoolConfig()
    config.NumWorkers = 10
    config.EnableQueues = true   // Enable named queues
    config.EnableResults = true
    
    pool := goroutine.NewWorkerPoolWithConfig(config)
    pool.Start()
    defer pool.Stop()
    
    // Submit to high priority queue
    criticalTask := &goroutine.WorkerTask{
        ID:       "urgent-notification",
        Priority: goroutine.PriorityCritical,
        Func: func(ctx context.Context) error {
            return sendNotification()
        },
    }
    pool.SubmitToQueue("notifications", criticalTask)
    
    // Submit to normal priority queue
    reportTask := &goroutine.WorkerTask{
        ID:       "generate-report",
        Priority: goroutine.PriorityNormal,
        Func: func(ctx context.Context) error {
            return generateReport()
        },
        Timeout: 5 * time.Minute,
    }
    pool.SubmitToQueue("reports", reportTask)
    
    // Check queue length
    length := pool.GetQueueLength("notifications")
    fmt.Printf("Notifications queue: %d pending\n", length)
    
    // Acknowledge task completion (Celery mode)
    pool.AckTask("urgent-notification")
}
Worker Pool with Broker Encoding/Decoding

Serialize and deserialize tasks for message brokers (Redis, RabbitMQ, etc.):

package main

import (
    "context"
    "fmt"
    "github.com/ivikasavnish/goroutine"
)

func main() {
    // Register task handlers by name
    goroutine.RegisterTaskHandler("send-email", func(ctx context.Context, args map[string]interface{}) error {
        recipient := args["recipient"].(string)
        fmt.Printf("Sending email to: %s\n", recipient)
        return nil
    })
    
    // Create task with handler name
    task := &goroutine.WorkerTask{
        ID:          "email-001",
        HandlerName: "send-email",  // Handler name for registry
        Queue:       "notifications",
        Priority:    goroutine.PriorityHigh,
        Args: map[string]interface{}{
            "recipient": "user@example.com",
            "subject":   "Welcome!",
        },
        RetryPolicy: goroutine.DefaultRetryPolicy(),
    }
    
    // Encode task to JSON for broker (Redis, RabbitMQ, etc.)
    data, _ := task.EncodeToBroker()
    fmt.Printf("Encoded: %s\n", string(data))
    // Save to Redis: redis.Set("job:email-001", data)
    
    // Later: retrieve from broker and decode
    // data := redis.Get("job:email-001")
    decodedTask, _ := goroutine.DecodeFromBroker(data)
    
    // Execute the task
    decodedTask.Func(context.Background())
}

Compatible with Resque format (uses class):

{
  "id": "task-123",
  "class": "send-email",
  "args": {"recipient": "user@example.com"}
}

Compatible with Celery format (uses task and kwargs):

{
  "id": "task-456",
  "task": "send-email",
  "kwargs": {"recipient": "user@example.com"},
  "eta": "2024-01-01T12:00:00Z"
}

API Reference

SwissMap
NewSwissMap[K comparable, V any]() *SwissMap[K, V]

Creates a new SwissMap with default shard count (32). Provides excellent performance for most concurrent workloads.

NewSwissMapWithShards[K comparable, V any](shardCount uint32) *SwissMap[K, V]

Creates a new SwissMap with specified shard count. The shard count must be a power of 2. Higher shard counts reduce lock contention but increase memory overhead.

(*SwissMap[K, V]) Set(key K, value V)

Stores a key-value pair in the map. Thread-safe.

(*SwissMap[K, V]) Get(key K) (V, bool)

Retrieves a value from the map. Returns the value and true if found, zero value and false otherwise. Thread-safe.

(*SwissMap[K, V]) Delete(key K)

Removes a key from the map. Thread-safe.

(*SwissMap[K, V]) Has(key K) bool

Checks if a key exists in the map. Thread-safe.

(*SwissMap[K, V]) GetOrSet(key K, value V) (V, bool)

Retrieves a value or sets it if not present. Returns the value (existing or newly set) and true if it was already present. Atomic operation.

(*SwissMap[K, V]) GetOrCompute(key K, compute func() V) V

Retrieves a value or computes and sets it if not present. The compute function is only called if the key doesn't exist. Useful for lazy initialization.

(*SwissMap[K, V]) Len() int

Returns the total number of entries in the map. Thread-safe but requires iterating all shards.

(*SwissMap[K, V]) Clear()

Removes all entries from the map. Thread-safe.

(*SwissMap[K, V]) Range(f func(key K, value V) bool)

Iterates over all key-value pairs in the map. The function f is called for each entry. If f returns false, iteration stops. Thread-safe.

(*SwissMap[K, V]) Keys() []K

Returns all keys in the map as a slice. Thread-safe.

(*SwissMap[K, V]) Values() []V

Returns all values in the map as a slice. Thread-safe.

(*SwissMap[K, V]) ToMap() map[K]V

Converts the SwissMap to a regular Go map. Creates a snapshot of the current state. Thread-safe.

Async Resolve
NewGroup() *Group

Creates a new task group for managing async operations.

(*Group) Assign(result *any, fn func() any)

Assigns a function to run asynchronously. The result is stored in the provided pointer when the task completes.

(*Group) Resolve()

Blocks until all assigned tasks complete.

(*Group) ResolveWithTimeout(timeout time.Duration) bool

Waits for tasks to complete with a timeout. Returns true if all tasks completed, false if timeout occurred.

SuperSlice
NewSuperSlice[T any](slice []T) *SuperSlice[T]

Creates a new SuperSlice from a slice.

NewSuperSliceWithConfig[T any](slice []T, config *SuperSliceConfig) *SuperSlice[T]

Creates a new SuperSlice with custom configuration.

Configuration Methods (Fluent API):
  • WithThreshold(threshold int) *SuperSlice[T] - Set parallelization threshold (default: 1000)
  • WithWorkers(numWorkers int) *SuperSlice[T] - Set number of worker goroutines (default: NumCPU)
  • WithIterable() *SuperSlice[T] - Enable iterable processing mode
  • WithInPlace() *SuperSlice[T] - Enable in-place updates
Processing Methods:
  • Process(callback func(int, T) T) []T - Transform slice items
  • ProcessWithError(callback func(int, T) (T, error)) ([]T, error) - Transform with error handling
  • ForEach(callback func(int, T)) - Iterate without collecting results
  • FilterSlice(predicate func(int, T) bool) []T - Filter items in parallel
MapTo[T, U any](ss *SuperSlice[T], mapper func(int, T) U) []U

Transform slice items to a different type.

SafeChannel
NewSafeChannel[T any](bufferSize int, defaultTimeout time.Duration) *SafeChannel[T]

Creates a new thread-safe channel wrapper.

(*SafeChannel[T]) Send(value T) error

Sends a value with default timeout.

(*SafeChannel[T]) SendWithTimeout(value T, timeout time.Duration) error

Sends a value with custom timeout.

(*SafeChannel[T]) Receive() (T, error)

Receives a value with default timeout.

(*SafeChannel[T]) ReceiveWithTimeout(timeout time.Duration) (T, error)

Receives a value with custom timeout.

(*SafeChannel[T]) Close() error

Safely closes the channel.

Cache & Preflight
NewCache[K comparable, V any]() *Cache[K, V]

Creates a new in-memory cache with TTL support.

(*Cache[K, V]) Get(key K) (*CacheEntry[V], bool)

Retrieves a value from the cache. Returns the entry and true if found, nil and false otherwise.

(*Cache[K, V]) Set(key K, value V, ttl time.Duration)

Stores a value in the cache with the specified TTL (time-to-live).

(*Cache[K, V]) Delete(key K)

Removes a value from the cache.

(*Cache[K, V]) Clear()

Removes all entries from the cache.

(*Cache[K, V]) Cleanup()

Removes expired entries from the cache.

NewCachedGroup() *CachedGroup

Creates a new CachedGroup with integrated caching support.

(*CachedGroup) AssignWithCache(key string, result *any, fn func() any, control *CacheControl)

Assigns a task with cache-first preflight check. If the cache contains a valid entry for the key, it uses that value. Otherwise, it fetches from the provided function and caches the result.

(*CachedGroup) Resolve()

Waits for all assigned tasks to complete.

(*CachedGroup) ResolveWithTimeout(timeout time.Duration) bool

Waits for tasks with a timeout. Returns true if all completed, false if timeout occurred.

(*CachedGroup) ClearCache()

Clears all cached entries.

NewPreflightFetcher[T any](fetchFunc func(context.Context, string) (T, error), control *CacheControl) *PreflightFetcher[T]

Creates a new preflight fetcher with caching. The fetch function is only called on cache misses.

(*PreflightFetcher[T]) Fetch(ctx context.Context, key string) (T, error)

Retrieves data with preflight cache check. This implements the cache-first pattern to reduce downstream load.

(*PreflightFetcher[T]) FetchStaleWhileRevalidate(ctx context.Context, key string) (T, error)

Returns stale data immediately while revalidating in background. Provides optimal user experience.

(*PreflightFetcher[T]) SetCacheControl(control *CacheControl)

Updates the cache control directives.

(*PreflightFetcher[T]) ClearCache()

Clears all cached entries.

NewParametricFetcher[P, T any](fetchFunc func(context.Context, P) (T, error), keyFunc KeyFunc[P], control *CacheControl) *ParametricFetcher[P, T]

Creates a fetcher with automatic key generation from parameters. The keyFunc generates cache keys automatically from parameters. If keyFunc is nil, uses default string conversion. Recommended for most use cases.

(*ParametricFetcher[P, T]) Fetch(ctx context.Context, params P) (T, error)

Retrieves data with automatic key generation from parameters. No manual key construction needed.

(*ParametricFetcher[P, T]) FetchStaleWhileRevalidate(ctx context.Context, params P) (T, error)

Returns stale data immediately while revalidating in background, with automatic key generation.

(*ParametricFetcher[P, T]) SetCacheControl(control *CacheControl)

Updates the cache control directives.

(*ParametricFetcher[P, T]) ClearCache()

Clears all cached entries.

Key Generation Helpers
  • FormatKeyFunc[P any](format string) KeyFunc[P] - Creates a KeyFunc using a format string (e.g., "user:%d")
  • StringKeyFunc() KeyFunc[string] - Uses the parameter directly as the cache key
  • SimpleKeyFunc[P any](fn func(P) string) KeyFunc[P] - Wraps a custom key generation function
Cache Control Directives
type CacheControl struct {
    NoCache              bool          // Force fetch from source, bypassing cache
    MaxAge               time.Duration // How long cached data is valid
    StaleWhileRevalidate bool          // Allow stale data while fetching fresh data
}
DefaultCacheControl() *CacheControl

Returns a cache control with sensible defaults (NoCache: false, MaxAge: 5 minutes, StaleWhileRevalidate: false).

GoManager
NewGoManager() *GoManager

Creates a new goroutine manager.

(*GoManager) GO(name string, fn interface{}, argv ...interface{})

Launches a named goroutine with cancellation support.

(*GoManager) Cancel(name string)

Cancels a named goroutine.

(*GoManager) AddCancelFunc(name string, cancelFunc context.CancelFunc)

Adds a cancel function for a named goroutine.

Concurrency Patterns
NewPipeline[T any]() *Pipeline[T]

Creates a new pipeline for composable data processing.

(*Pipeline[T]) AddStage(stage func(T) T) *Pipeline[T]

Adds a processing stage to the pipeline. Returns the pipeline for chaining.

(*Pipeline[T]) Execute(item T) T

Executes the pipeline on a single item.

(*Pipeline[T]) ExecuteAsync(ctx context.Context, items []T) []T

Processes items through the pipeline concurrently.

NewFanOut[T, R any](numWorkers int) *FanOut[T, R]

Creates a fan-out pattern with specified number of workers.

(*FanOut[T, R]) ProcessWithIndex(ctx context.Context, items []T, processor func(int, T) R) []R

Distributes work across workers and collects results in order.

NewRateLimiter(rate int) *RateLimiter

Creates a rate limiter with specified operations per second.

(*RateLimiter) Wait(ctx context.Context) error

Blocks until a token is available.

(*RateLimiter) TryAcquire() bool

Attempts to acquire a token without blocking.

(*RateLimiter) Stop()

Stops the rate limiter and releases resources.

NewSemaphore(permits int) *Semaphore

Creates a semaphore with specified number of permits.

(*Semaphore) Acquire(ctx context.Context) error

Acquires a permit, blocking if none available.

(*Semaphore) Release()

Releases a permit.

NewGenerator[T any](ctx context.Context, bufferSize int, producer func(context.Context, chan<- T)) *Generator[T]

Creates a generator with a producer function.

(*Generator[T]) Next() (T, bool)

Retrieves the next value from the generator.

(*Generator[T]) Collect() []T

Collects all remaining values into a slice.

(*Generator[T]) Close()

Stops the generator.

Smart Concurrency Wrapper
NewConcurrencyWrapper[T, R any](config *ConcurrencyConfig) *ConcurrencyWrapper[T, R]

Creates a new concurrency wrapper with the given configuration.

DefaultConcurrencyConfig() *ConcurrencyConfig

Returns a configuration with sensible defaults.

(*ConcurrencyWrapper[T, R]) Process(ctx context.Context, items []T, processor func(T) (R, error)) ([]R, error)

Processes items with automatic concurrency control, retry, rate limiting, and circuit breaker.

(*ConcurrencyWrapper[T, R]) Close()

Cleans up wrapper resources.

NewCircuitBreaker(threshold int) *CircuitBreaker

Creates a circuit breaker with specified failure threshold.

(*CircuitBreaker) Allow() bool

Checks if a request should be allowed.

(*CircuitBreaker) RecordSuccess()

Records a successful operation.

(*CircuitBreaker) RecordFailure()

Records a failed operation.

Worker Pool
NewWorkerPool(numWorkers int) *WorkerPool

Creates a new worker pool with the specified number of workers. If numWorkers <= 0, defaults to 1.

(*WorkerPool) Start()

Starts the worker pool and begins processing tasks. Must be called before submitting tasks.

(*WorkerPool) Stop()

Stops the worker pool and waits for all workers to finish. Safely handles multiple calls.

(*WorkerPool) Submit(task *WorkerTask) error

Submits a task for immediate execution. Returns error if pool is not running.

(*WorkerPool) SubmitDelayed(task *WorkerTask, delay time.Duration) error

Submits a task for delayed execution. Task will execute after the specified delay.

(*WorkerPool) SubmitCron(task *WorkerTask, cronExpr string) error

Submits a recurring task with a cron schedule. Supported expressions:

  • @hourly - Run every hour
  • @daily - Run every day at midnight
  • @weekly - Run every week
  • @every 5s - Run every 5 seconds (supports s, m, h units)
(*WorkerPool) CancelCron(taskID string)

Cancels a recurring cron task by its ID.

(*WorkerPool) IsRunning() bool

Returns whether the worker pool is currently running.

(*WorkerPool) WorkerCount() int

Returns the number of workers in the pool.

NewWorkerPoolWithConfig(config *WorkerPoolConfig) *WorkerPool

Creates a worker pool with custom configuration for Resque/Celery modes.

DefaultWorkerPoolConfig() *WorkerPoolConfig

Returns default worker pool configuration.

(*WorkerPool) SubmitToQueue(queueName string, task *WorkerTask) error

Submits a task to a named queue (Celery mode). Creates queue if it doesn't exist.

(*WorkerPool) GetResult(taskID string) (*TaskResult, bool)

Retrieves the result of a completed task. Returns the result and true if found.

(*WorkerPool) GetDeadLetterTasks() []*WorkerTask

Returns all tasks that failed after exhausting retries (Resque mode).

(*WorkerPool) RequeueDeadLetter(taskID string) error

Requeues a task from the dead letter queue (Resque mode).

(*WorkerPool) AckTask(taskID string)

Acknowledges task completion (Celery mode).

(*WorkerPool) RejectTask(taskID string, requeue bool) error

Rejects a task and optionally requeues it (Celery mode).

(*WorkerPool) GetQueueLength(queueName string) int

Returns the number of pending tasks in a queue (Celery mode).

RegisterTaskHandler(name string, handler TaskHandler)

Registers a task handler function by name for broker-based task retrieval. The handler can be retrieved later when decoding tasks from a broker.

GetTaskHandler(name string) (TaskHandler, error)

Retrieves a registered task handler by name. Returns error if handler not found.

(*WorkerTask) EncodeToBroker() ([]byte, error)

Encodes the task to JSON format compatible with message brokers (Redis, RabbitMQ, etc.). Returns JSON bytes that can be saved to a broker. The task must have HandlerName set. Output format is compatible with both Resque and Celery.

DecodeFromBroker(data []byte) (*WorkerTask, error)

Decodes a task from broker JSON format. Automatically detects and handles Resque format (using class field) and Celery format (using task and kwargs fields). Retrieves the registered handler and creates an executable task.

NewCronSchedule(expr string) (*CronSchedule, error)

Creates a new cron schedule from an expression. Returns error if expression is invalid.

WorkerTask Structure
type WorkerTask struct {
    ID          string                             // Unique task identifier
    Func        func(ctx context.Context) error   // Task function to execute
    HandlerName string                             // Handler name for broker encoding/decoding
    Priority    TaskPriority                       // Task priority (Low, Normal, High, Critical)
    Queue       string                             // Queue name (Celery mode)
    RetryPolicy *RetryPolicy                       // Retry configuration (Resque mode)
    Timeout     time.Duration                      // Task execution timeout
    ExpiresAt   *time.Time                         // Task expiration time
    Args        map[string]interface{}             // Task arguments metadata
}
RetryPolicy Structure
type RetryPolicy struct {
    MaxRetries    int           // Maximum number of retry attempts
    RetryDelay    time.Duration // Initial delay between retries
    BackoffFactor float64       // Exponential backoff multiplier
    MaxRetryDelay time.Duration // Maximum delay between retries
}
TaskResult Structure
type TaskResult struct {
    TaskID    string      // Task identifier
    Status    TaskStatus  // Task status (pending, running, success, failed, retrying, expired)
    Result    interface{} // Task result (if any)
    Error     error       // Task error (if failed)
    StartTime time.Time   // Task start time
    EndTime   time.Time   // Task end time
    Attempts  int         // Number of execution attempts
}
WorkerPoolConfig Structure
type WorkerPoolConfig struct {
    NumWorkers      int    // Number of worker goroutines
    MaxDeadLetter   int    // Maximum dead letter queue size
    EnableQueues    bool   // Enable named queues (Celery mode)
    EnableResults   bool   // Enable result storage
    OnTaskStart     func(task *WorkerTask)
    OnTaskComplete  func(task *WorkerTask, result *TaskResult)
    OnTaskFailed    func(task *WorkerTask, err error)
}
TaskHandler Type
type TaskHandler func(ctx context.Context, args map[string]interface{}) error

A function type for registered task handlers. Handlers receive a context and arguments map, making them suitable for broker-based task execution.

BrokerJobData Structure
type BrokerJobData struct {
    ID          string                 `json:"id"`
    HandlerName string                 `json:"handler_name"`
    Queue       string                 `json:"queue"`
    Priority    TaskPriority           `json:"priority"`
    Args        map[string]interface{} `json:"args"`
    RetryPolicy *RetryPolicy           `json:"retry_policy,omitempty"`
    Timeout     int64                  `json:"timeout,omitempty"`      // Milliseconds
    Delay       int64                  `json:"delay,omitempty"`        // Milliseconds
    ExpiresAt   *time.Time             `json:"expires_at,omitempty"`
    CronExpr    string                 `json:"cron_expr,omitempty"`
    IsRecurring bool                   `json:"is_recurring,omitempty"`
    CreatedAt   time.Time              `json:"created_at"`
    
    // Resque compatibility
    Class string `json:"class,omitempty"` // For Resque: task class name
    
    // Celery compatibility
    Task    string                  `json:"task,omitempty"`    // For Celery: task name
    Kwargs  map[string]interface{}  `json:"kwargs,omitempty"`  // For Celery: keyword arguments
    ETA     *time.Time              `json:"eta,omitempty"`     // For Celery: estimated time of arrival
    Expires *time.Time              `json:"expires,omitempty"` // For Celery: expiration time
}

Represents job data structure for broker storage, compatible with both Resque and Celery formats.

Examples

Comprehensive examples are available in the example/ directory:

Performance Characteristics

SwissMap
  • Concurrent reads: 2-3x faster than sync.Map
  • Concurrent writes: 3-5x faster than sync.Map
  • Mixed workload: 5-10x faster than sync.Map under high concurrency
  • Memory overhead: ~30% less than sync.Map
  • Scalability: Excellent with many goroutines due to sharded architecture
  • Lock contention: Minimal due to per-shard locking
SuperSlice
  • Small slices (< 1000 items): Sequential processing to avoid overhead
  • Large slices (β‰₯ 1000 items): Parallel processing with worker pools
  • Configurable threshold: Balance between overhead and parallelism benefit
  • Worker pool: Defaults to runtime.NumCPU(), configurable based on workload
Async Resolve
  • Parallel execution: All tasks run concurrently
  • Total time: Max task duration (not sum of all durations)
  • Minimal overhead: Uses efficient sync.WaitGroup internally
Cache & Preflight
  • Cache hit: Sub-microsecond response time (typically < 1Β΅s)
  • Cache miss: Full fetch time + cache store overhead (typically < 100Β΅s)
  • Preflight check: Happens before expensive DB/API calls
  • Load reduction: 60-90% reduction in downstream calls for repeated reads
  • Memory overhead: ~100 bytes per cached entry + data size
  • Stale-while-revalidate: Returns stale data in < 1Β΅s, revalidates in background
Concurrency Patterns
  • Pipeline: Minimal overhead, composable stages, sequential execution per item
  • Fan-Out/Fan-In: Distributes work efficiently, maintains result order, scales with workers
  • Rate Limiter: Token bucket algorithm, precise rate control, minimal memory
  • Semaphore: Fast acquire/release, context-aware, no busy waiting
  • Generator: Lazy evaluation, memory efficient, context-based cancellation
  • Smart Wrapper: Combines patterns with minimal overhead, configurable policies
Worker Pool
  • Task execution: Concurrent processing with configurable worker count
  • Delayed tasks: Time-based scheduling with millisecond precision
  • Cron jobs: Recurring tasks with flexible scheduling (checked every 50ms)
  • Memory overhead: Minimal - only task queue and scheduling metadata
  • Throughput: Limited by worker count and task duration
  • Latency: Immediate tasks execute as soon as workers are available
  • Smart Wrapper: Combines patterns with minimal overhead, configurable policies

Use Cases

βœ… Good Use Cases

SwissMap:

  • High-concurrency caching (e.g., session stores, API caches)
  • Shared state management across many goroutines
  • Configuration stores with frequent reads
  • Rate limiting counters and metrics
  • User session management
  • Real-time data aggregation
  • Any scenario with frequent concurrent reads and writes

SuperSlice:

  • Processing large datasets (thousands of elements)
  • CPU-intensive operations per element
  • Independent element processing
  • Memory-constrained environments (with in-place updates)

Async Resolve:

  • Fetching data from multiple sources simultaneously
  • Concurrent API requests
  • Parallel computational tasks
  • Multi-stage pipelines with dependencies

SafeChannel:

  • Distributed systems with multiple backends
  • Timeout-sensitive operations
  • Thread-safe channel operations

Cache & Preflight:

  • High-traffic APIs with repeated reads
  • Database query optimization
  • Reducing load on downstream services
  • Improving response times for frequently accessed data
  • Implementing cache-aside pattern Pipeline:
  • ETL (Extract, Transform, Load) operations
  • Multi-stage data processing
  • Sequential transformations
  • Composable data workflows

Fan-Out/Fan-In:

  • Batch API calls with aggregation
  • Parallel data processing with order preservation
  • Load distribution across workers
  • High-throughput processing

Rate Limiter:

  • API throttling and quota management
  • Preventing service overload
  • Controlled resource access
  • Batch job rate control

Semaphore:

  • Database connection pooling
  • Limited resource access control
  • Concurrency limiting
  • Worker pool management

Generator:

  • Stream processing
  • Infinite sequences
  • On-demand data production
  • Memory-efficient iteration

Smart Wrapper:

  • Resilient service calls
  • Fault-tolerant distributed operations
  • Automatic retry with backoff
  • Circuit breaker for failing services

Worker Pool:

  • Background job processing
  • Scheduled task execution
  • Email/notification queues
  • Periodic maintenance tasks
  • Batch processing with controlled concurrency
  • Task scheduling systems
  • Delayed message processing
  • Recurring report generation
  • Resque-style: Task retry with exponential backoff, dead letter queue management
  • Celery-style: Named queue routing, task priorities, distributed task execution
  • Job queues with priority levels
  • Fault-tolerant task processing
  • Task result tracking and retrieval
❌ Not Ideal For

SwissMap:

  • Single-threaded access (use regular map[K]V)
  • Read-only maps after initialization (use regular map[K]V)
  • Very low concurrency (< 5 goroutines) - overhead not worth it
  • Extremely memory-constrained environments

SuperSlice:

  • Very small slices (< 100 elements) - overhead not worth it
  • Operations requiring sequential ordering guarantees
  • Highly interdependent element processing

Async Resolve:

  • Single async operation (use plain goroutine)
  • Sequential dependencies between all tasks

Cache & Preflight:

  • Write-heavy workloads with frequent updates
  • Data that changes constantly and requires real-time accuracy
  • Very low memory environments where cache overhead is prohibitive

Worker Pool:

  • Real-time processing requiring microsecond precision
  • Tasks requiring strict ordering guarantees
  • Very high-frequency tasks (>1000/second) - consider dedicated solutions
  • Tasks with complex interdependencies

Testing

Run the test suite:

go test ./...

Run benchmarks:

go test -bench=. -benchmem

License

This project is licensed under the MIT License - see the LICENSE file for details.

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

Documentation

For detailed API documentation, visit pkg.go.dev/github.com/ivikasavnish/goroutine.

Author

Copyright (c) 2024 ivikasavnish

Documentation ΒΆ

Overview ΒΆ

Package goroutine provides advanced concurrent processing utilities for Go.

This package offers powerful tools for managing concurrent operations including:

  • Async task resolution (Promise-like pattern with Group)
  • Safe channel operations with timeout support (SafeChannel)
  • Parallel slice processing with automatic parallelization (SuperSlice)
  • Flexible goroutine management with cancellation (GoManager)
  • Type conversion utilities (Recasting)

Key Features:

Async Resolve: Launch multiple async operations and wait for all to complete, similar to Promise.all() in JavaScript. Supports timeout-based operations.

SuperSlice: Process large slices efficiently with automatic parallelization based on configurable thresholds. Includes worker pool management, in-place updates, and support for map/filter/forEach operations.

SafeChannel: Thread-safe channel wrapper with timeout capabilities and distributed backend support for building robust concurrent systems.

GoManager: Manage named goroutines with context-based lifecycle management and dynamic cancellation support.

Example usage:

// Async Resolve
group := goroutine.NewGroup()
var result1, result2 any
group.Assign(&result1, func() any { return "done" })
group.Assign(&result2, func() any { return 42 })
group.Resolve()

// SuperSlice
numbers := []int{1, 2, 3, 4, 5}
ss := goroutine.NewSuperSlice(numbers)
doubled := ss.Process(func(i int, n int) int { return n * 2 })

// SafeChannel
sc := goroutine.NewSafeChannel[int](10, 5*time.Second)
sc.Send(42)
value, _ := sc.Receive()

// GoManager
manager := goroutine.NewGoManager()
manager.GO("worker", func() { /* work */ })
manager.Cancel("worker")

Index ΒΆ

Constants ΒΆ

This section is empty.

Variables ΒΆ

View Source
var (
	ErrChannelClosed  = errors.New("channel is closed")
	ErrTimeout        = errors.New("operation timed out")
	ErrBackendFailed  = errors.New("backend operation failed")
	ErrInvalidBackend = errors.New("invalid backend")
	ErrNotImplemented = errors.New("not implemented")
)
View Source
var (
	ErrRetryExhausted  = errors.New("retry attempts exhausted")
	ErrContextCanceled = errors.New("context canceled")
)

Functions ΒΆ

func MapTo ΒΆ

func MapTo[T, U any](ss *SuperSlice[T], mapper func(index int, item T) U) []U

MapTo transforms slice elements to a different type

func Null ΒΆ

func Null()

func RecastToJSON ΒΆ

func RecastToJSON(src, dst interface{})

RecastToJSON maps source struct fields to destination struct using recast and json tags

func RecastToJSONBytes ΒΆ

func RecastToJSONBytes(src interface{}, opts *RecastOptions) ([]byte, error)

RecastToJSONBytes converts a struct to JSON bytes with field mapping and transformations

func RecastToJSONString ΒΆ

func RecastToJSONString(src interface{}, opts *RecastOptions) (string, error)

RecastToJSONString converts a struct to JSON string with field mapping and transformations

func RecastToJSONWithOptions ΒΆ

func RecastToJSONWithOptions(src, dst interface{}, opts *RecastOptions)

RecastToJSONWithOptions provides advanced struct-to-struct mapping with transformations

func RecastToMap ΒΆ

func RecastToMap(src interface{}, opts *RecastOptions) map[string]interface{}

RecastToMap converts a struct to a map[string]interface{} with field mapping and transformations

func RegisterTaskHandler ΒΆ

func RegisterTaskHandler(name string, handler TaskHandler)

RegisterTaskHandler registers a task handler with a name for broker serialization

Types ΒΆ

type BrokerJobData ΒΆ

type BrokerJobData struct {
	ID          string                 `json:"id"`
	HandlerName string                 `json:"handler_name"`
	Queue       string                 `json:"queue"`
	Priority    TaskPriority           `json:"priority"`
	Args        map[string]interface{} `json:"args"`
	RetryPolicy *RetryPolicy           `json:"retry_policy,omitempty"`
	Timeout     int64                  `json:"timeout,omitempty"` // Milliseconds
	Delay       int64                  `json:"delay,omitempty"`   // Milliseconds
	ExpiresAt   *time.Time             `json:"expires_at,omitempty"`
	CronExpr    string                 `json:"cron_expr,omitempty"`
	IsRecurring bool                   `json:"is_recurring,omitempty"`
	CreatedAt   time.Time              `json:"created_at"`

	// Resque compatibility
	Class string `json:"class,omitempty"` // For Resque: task class name

	// Celery compatibility
	Task    string                 `json:"task,omitempty"`    // For Celery: task name
	Kwargs  map[string]interface{} `json:"kwargs,omitempty"`  // For Celery: keyword arguments
	ETA     *time.Time             `json:"eta,omitempty"`     // For Celery: estimated time of arrival
	Expires *time.Time             `json:"expires,omitempty"` // For Celery: expiration time
}

BrokerJobData represents the job data structure for broker storage (Resque/Celery compatible)

type Cache ΒΆ

type Cache[K comparable, V any] struct {
	// contains filtered or unexported fields
}

Cache provides a simple in-memory cache with TTL support Now uses SwissMap for improved concurrent performance

func NewCache ΒΆ

func NewCache[K comparable, V any]() *Cache[K, V]

NewCache creates a new cache instance

func (*Cache[K, V]) Cleanup ΒΆ

func (c *Cache[K, V]) Cleanup()

Cleanup removes expired entries from the cache

func (*Cache[K, V]) Clear ΒΆ

func (c *Cache[K, V]) Clear()

Clear removes all entries from the cache

func (*Cache[K, V]) Delete ΒΆ

func (c *Cache[K, V]) Delete(key K)

Delete removes a value from the cache

func (*Cache[K, V]) Get ΒΆ

func (c *Cache[K, V]) Get(key K) (*CacheEntry[V], bool)

Get retrieves a value from the cache

func (*Cache[K, V]) Set ΒΆ

func (c *Cache[K, V]) Set(key K, value V, ttl time.Duration)

Set stores a value in the cache with TTL

type CacheControl ΒΆ

type CacheControl struct {
	// NoCache forces fetch from source, bypassing cache
	NoCache bool
	// MaxAge defines how long cached data is valid
	MaxAge time.Duration
	// StaleWhileRevalidate allows stale data while fetching fresh data
	StaleWhileRevalidate bool
}

CacheControl represents cache control directives

func DefaultCacheControl ΒΆ

func DefaultCacheControl() *CacheControl

DefaultCacheControl returns a cache control with sensible defaults

type CacheEntry ΒΆ

type CacheEntry[T any] struct {
	Value     T
	CachedAt  time.Time
	ExpiresAt time.Time
	Valid     bool
}

CacheEntry represents a cached value with metadata

func (*CacheEntry[T]) IsExpired ΒΆ

func (ce *CacheEntry[T]) IsExpired() bool

IsExpired checks if the cache entry has expired

func (*CacheEntry[T]) IsStale ΒΆ

func (ce *CacheEntry[T]) IsStale() bool

IsStale checks if the cache entry is stale but still usable

type CachedGroup ΒΆ

type CachedGroup struct {
	// contains filtered or unexported fields
}

CachedGroup wraps Group with caching support for preflight checks

func NewCachedGroup ΒΆ

func NewCachedGroup() *CachedGroup

NewCachedGroup creates a new CachedGroup with integrated caching

func (*CachedGroup) AssignWithCache ΒΆ

func (cg *CachedGroup) AssignWithCache(
	key string,
	result *any,
	fn func() any,
	control *CacheControl,
)

AssignWithCache assigns a task with cache-first preflight check If the cache contains a valid entry for the key, it uses that value Otherwise, it fetches from the provided function and caches the result

func (*CachedGroup) ClearCache ΒΆ

func (cg *CachedGroup) ClearCache()

ClearCache clears all cached entries

func (*CachedGroup) GetCache ΒΆ

func (cg *CachedGroup) GetCache() *Cache[string, any]

GetCache returns the underlying cache for direct access

func (*CachedGroup) Resolve ΒΆ

func (cg *CachedGroup) Resolve()

Resolve waits for all assigned tasks to complete

func (*CachedGroup) ResolveWithTimeout ΒΆ

func (cg *CachedGroup) ResolveWithTimeout(timeout time.Duration) bool

ResolveWithTimeout waits for tasks with a timeout

type CircuitBreaker ΒΆ

type CircuitBreaker struct {
	// contains filtered or unexported fields
}

CircuitBreaker implements the circuit breaker pattern

func NewCircuitBreaker ΒΆ

func NewCircuitBreaker(threshold int) *CircuitBreaker

NewCircuitBreaker creates a new circuit breaker

func (*CircuitBreaker) Allow ΒΆ

func (cb *CircuitBreaker) Allow() bool

Allow checks if a request should be allowed

func (*CircuitBreaker) RecordFailure ΒΆ

func (cb *CircuitBreaker) RecordFailure()

RecordFailure records a failed operation

func (*CircuitBreaker) RecordSuccess ΒΆ

func (cb *CircuitBreaker) RecordSuccess()

RecordSuccess records a successful operation

func (*CircuitBreaker) Reset ΒΆ

func (cb *CircuitBreaker) Reset()

Reset resets the circuit breaker

func (*CircuitBreaker) State ΒΆ

func (cb *CircuitBreaker) State() CircuitState

State returns the current state

type CircuitState ΒΆ

type CircuitState int

CircuitState represents the state of the circuit breaker

const (
	// CircuitClosed means requests are allowed
	CircuitClosed CircuitState = iota
	// CircuitOpen means requests are blocked
	CircuitOpen
	// CircuitHalfOpen means testing if service recovered
	CircuitHalfOpen
)

type ConcurrencyConfig ΒΆ

type ConcurrencyConfig struct {
	// MaxConcurrency limits the number of concurrent operations
	MaxConcurrency int
	// Timeout specifies the maximum time for operations
	Timeout time.Duration
	// RateLimit specifies operations per second (0 = no limit)
	RateLimit int
	// RetryAttempts specifies number of retry attempts on failure
	RetryAttempts int
	// RetryDelay specifies delay between retry attempts
	RetryDelay time.Duration
	// EnableCircuitBreaker enables circuit breaker pattern
	EnableCircuitBreaker bool
	// CircuitBreakerThreshold is the failure threshold before opening circuit
	CircuitBreakerThreshold int
}

ConcurrencyConfig holds configuration for the smart wrapper

func DefaultConcurrencyConfig ΒΆ

func DefaultConcurrencyConfig() *ConcurrencyConfig

DefaultConcurrencyConfig returns default configuration

type ConcurrencyWrapper ΒΆ

type ConcurrencyWrapper[T, R any] struct {
	// contains filtered or unexported fields
}

ConcurrencyWrapper provides a smart wrapper around concurrency patterns

func NewConcurrencyWrapper ΒΆ

func NewConcurrencyWrapper[T, R any](config *ConcurrencyConfig) *ConcurrencyWrapper[T, R]

NewConcurrencyWrapper creates a new concurrency wrapper with given config

func (*ConcurrencyWrapper[T, R]) Close ΒΆ

func (cw *ConcurrencyWrapper[T, R]) Close()

Close cleans up resources

func (*ConcurrencyWrapper[T, R]) Process ΒΆ

func (cw *ConcurrencyWrapper[T, R]) Process(ctx context.Context, items []T, processor func(T) (R, error)) ([]R, error)

Process executes the processor function on all items with configured patterns

type CronSchedule ΒΆ

type CronSchedule struct {
	// contains filtered or unexported fields
}

CronSchedule parses and manages cron schedules

func NewCronSchedule ΒΆ

func NewCronSchedule(expr string) (*CronSchedule, error)

NewCronSchedule creates a new cron schedule from an expression Supported formats: - "@every 5s" - Run every 5 seconds - "@every 1m" - Run every 1 minute - "@every 1h" - Run every 1 hour - "@hourly" - Run every hour - "@daily" - Run every day at midnight

func (*CronSchedule) Next ΒΆ

func (cs *CronSchedule) Next(from time.Time) time.Time

Next returns the next execution time

type DistributedBackend ΒΆ

type DistributedBackend interface {
	Send(ctx context.Context, topic string, message []byte) error
	Receive(ctx context.Context, topic string) ([]byte, error)
	Subscribe(ctx context.Context, topic string, handler func([]byte) error) error
	Close() error
	Type() string
}

DistributedBackend interface for various message backends

type DistributedSafeChannel ΒΆ

type DistributedSafeChannel[T any] struct {
	// contains filtered or unexported fields
}

DistributedSafeChannel wraps SafeChannel with distributed backend support It includes optional caching support to reduce downstream load via preflight checks

func NewDistributedSafeChannel ΒΆ

func NewDistributedSafeChannel[T any](backend DistributedBackend, topic string, bufferSize int, timeout time.Duration) *DistributedSafeChannel[T]

NewDistributedSafeChannel creates a distributed safe channel with specified backend

func (*DistributedSafeChannel[T]) BackendType ΒΆ

func (dsc *DistributedSafeChannel[T]) BackendType() string

BackendType returns the type of backend in use

func (*DistributedSafeChannel[T]) ClearCache ΒΆ

func (dsc *DistributedSafeChannel[T]) ClearCache()

ClearCache clears all cached entries

func (*DistributedSafeChannel[T]) Close ΒΆ

func (dsc *DistributedSafeChannel[T]) Close() error

Close closes both backend and local channel

func (*DistributedSafeChannel[T]) IsClosed ΒΆ

func (dsc *DistributedSafeChannel[T]) IsClosed() bool

IsClosed checks if the distributed channel is closed

func (*DistributedSafeChannel[T]) Receive ΒΆ

func (dsc *DistributedSafeChannel[T]) Receive(ctx context.Context) (T, error)

Receive deserializes and receives value from backend with fallback to local channel

func (*DistributedSafeChannel[T]) ReceiveWithPreflight ΒΆ

func (dsc *DistributedSafeChannel[T]) ReceiveWithPreflight(ctx context.Context, key string) (T, error)

ReceiveWithPreflight attempts to receive with cache preflight check This reduces downstream load by checking cache before backend

func (*DistributedSafeChannel[T]) Send ΒΆ

func (dsc *DistributedSafeChannel[T]) Send(ctx context.Context, value T) error

Send serializes and sends value through the backend with fallback to local channel

func (*DistributedSafeChannel[T]) SetCacheControl ΒΆ

func (dsc *DistributedSafeChannel[T]) SetCacheControl(control *CacheControl)

SetCacheControl updates the cache control directives for preflight checks

func (*DistributedSafeChannel[T]) Subscribe ΒΆ

func (dsc *DistributedSafeChannel[T]) Subscribe(ctx context.Context) error

Subscribe starts listening to backend messages and forwards them to local channel

type Environment ΒΆ

type Environment string

Environment represents the deployment environment

const (
	// EnvProduction represents production environment
	EnvProduction Environment = "prod"
	// EnvStaging represents staging environment
	EnvStaging Environment = "stage"
	// EnvDevelopment represents development environment
	EnvDevelopment Environment = "dev"
)

type FanOut ΒΆ

type FanOut[T, R any] struct {
	// contains filtered or unexported fields
}

FanOut distributes work across multiple workers and fans in results

func NewFanOut ΒΆ

func NewFanOut[T, R any](numWorkers int) *FanOut[T, R]

NewFanOut creates a new fan-out pattern with specified number of workers

func (*FanOut[T, R]) Process ΒΆ

func (f *FanOut[T, R]) Process(ctx context.Context, items []T, processor func(T) R) []R

Process fans out work to multiple workers and fans in results

func (*FanOut[T, R]) ProcessWithIndex ΒΆ

func (f *FanOut[T, R]) ProcessWithIndex(ctx context.Context, items []T, processor func(int, T) R) []R

ProcessWithIndex is a more efficient version that includes index

type FeatureFlag ΒΆ

type FeatureFlag struct {
	// Name is the unique identifier for the feature flag
	Name string `json:"name"`
	// Description describes what this flag controls
	Description string `json:"description,omitempty"`
	// Enabled is the global on/off switch for all environments
	Enabled bool `json:"enabled"`
	// Environments contains environment-specific overrides
	Environments map[Environment]bool `json:"environments,omitempty"`
	// Rollout contains rollout policy configuration
	Rollout *RolloutConfig `json:"rollout,omitempty"`
	// CreatedAt is when the flag was created
	CreatedAt time.Time `json:"created_at"`
	// UpdatedAt is when the flag was last updated
	UpdatedAt time.Time `json:"updated_at"`
}

FeatureFlag represents a single feature flag with environment-specific settings

func (*FeatureFlag) IsEnabledForEnv ΒΆ

func (f *FeatureFlag) IsEnabledForEnv(env Environment) bool

IsEnabledForEnv checks if the flag is enabled for a specific environment

func (*FeatureFlag) IsEnabledForUser ΒΆ

func (f *FeatureFlag) IsEnabledForUser(env Environment, userID string, userSegments []string) bool

IsEnabledForUser checks if the flag is enabled for a specific user with rollout policy

type FeatureFlagSet ΒΆ

type FeatureFlagSet struct {
	// contains filtered or unexported fields
}

FeatureFlagSet manages a collection of feature flags with Redis backend

func NewFeatureFlagSet ΒΆ

func NewFeatureFlagSet(config *FeatureFlagSetConfig) (*FeatureFlagSet, error)

NewFeatureFlagSet creates a new feature flag set with Redis backend

func NewFeatureFlagSetSimple ΒΆ

func NewFeatureFlagSetSimple(redisAddr string, env Environment) (*FeatureFlagSet, error)

NewFeatureFlagSetSimple creates a feature flag set with simple parameters

func (*FeatureFlagSet) ClearCache ΒΆ

func (ffs *FeatureFlagSet) ClearCache()

ClearCache clears the local cache

func (*FeatureFlagSet) Close ΒΆ

func (ffs *FeatureFlagSet) Close() error

Close closes the Redis connection

func (*FeatureFlagSet) CreateFlag ΒΆ

func (ffs *FeatureFlagSet) CreateFlag(ctx context.Context, name string, enabled bool, description string) error

CreateFlag creates a new feature flag with default settings

func (*FeatureFlagSet) DeleteFlag ΒΆ

func (ffs *FeatureFlagSet) DeleteFlag(ctx context.Context, flagName string) error

DeleteFlag removes a feature flag

func (*FeatureFlagSet) GetEnvironment ΒΆ

func (ffs *FeatureFlagSet) GetEnvironment() Environment

GetEnvironment returns the current environment

func (*FeatureFlagSet) GetFlag ΒΆ

func (ffs *FeatureFlagSet) GetFlag(ctx context.Context, flagName string) (*FeatureFlag, error)

GetFlag retrieves a feature flag by name

func (*FeatureFlagSet) IsEnabled ΒΆ

func (ffs *FeatureFlagSet) IsEnabled(ctx context.Context, flagName string) (bool, error)

IsEnabled checks if a feature flag is enabled for the current environment

func (*FeatureFlagSet) IsEnabledForEnv ΒΆ

func (ffs *FeatureFlagSet) IsEnabledForEnv(ctx context.Context, flagName string, env Environment) (bool, error)

IsEnabledForEnv checks if a feature flag is enabled for a specific environment

func (*FeatureFlagSet) IsEnabledForUser ΒΆ

func (ffs *FeatureFlagSet) IsEnabledForUser(ctx context.Context, flagName string, userID string, userSegments []string) (bool, error)

IsEnabledForUser checks if a feature flag is enabled for a specific user with rollout policy

func (*FeatureFlagSet) IsEnabledForUserInEnv ΒΆ

func (ffs *FeatureFlagSet) IsEnabledForUserInEnv(ctx context.Context, flagName string, env Environment, userID string, userSegments []string) (bool, error)

IsEnabledForUserInEnv checks if a feature flag is enabled for a user in a specific environment

func (*FeatureFlagSet) ListFlags ΒΆ

func (ffs *FeatureFlagSet) ListFlags(ctx context.Context) ([]*FeatureFlag, error)

ListFlags returns all feature flags

func (*FeatureFlagSet) SetAllAtOnceRollout ΒΆ

func (ffs *FeatureFlagSet) SetAllAtOnceRollout(ctx context.Context, name string) error

SetAllAtOnceRollout configures an all-at-once rollout

func (*FeatureFlagSet) SetCanaryRollout ΒΆ

func (ffs *FeatureFlagSet) SetCanaryRollout(ctx context.Context, name string, segments []string) error

SetCanaryRollout configures a canary rollout with segments

func (*FeatureFlagSet) SetEnvironment ΒΆ

func (ffs *FeatureFlagSet) SetEnvironment(env Environment)

SetEnvironment updates the current environment

func (*FeatureFlagSet) SetFlag ΒΆ

func (ffs *FeatureFlagSet) SetFlag(ctx context.Context, flag *FeatureFlag) error

SetFlag creates or updates a feature flag

func (*FeatureFlagSet) SetFlagForEnv ΒΆ

func (ffs *FeatureFlagSet) SetFlagForEnv(ctx context.Context, name string, env Environment, enabled bool) error

SetFlagForEnv sets the enabled status for a specific environment

func (*FeatureFlagSet) SetGradualRollout ΒΆ

func (ffs *FeatureFlagSet) SetGradualRollout(ctx context.Context, name string, percentage int) error

SetGradualRollout configures a gradual rollout with percentage

func (*FeatureFlagSet) SetRolloutPolicy ΒΆ

func (ffs *FeatureFlagSet) SetRolloutPolicy(ctx context.Context, name string, rollout *RolloutConfig) error

SetRolloutPolicy sets the rollout policy for a feature flag

func (*FeatureFlagSet) SetTargetedRollout ΒΆ

func (ffs *FeatureFlagSet) SetTargetedRollout(ctx context.Context, name string, userIDs []string) error

SetTargetedRollout configures a targeted rollout with specific user IDs

func (*FeatureFlagSet) UpdateFlag ΒΆ

func (ffs *FeatureFlagSet) UpdateFlag(ctx context.Context, name string, enabled bool) error

UpdateFlag updates an existing feature flag's enabled status

type FeatureFlagSetConfig ΒΆ

type FeatureFlagSetConfig struct {
	// RedisAddr is the Redis server address (default: "localhost:6379")
	RedisAddr string
	// RedisPassword is the Redis password (optional)
	RedisPassword string
	// RedisDB is the Redis database number (default: 0)
	RedisDB int
	// KeyPrefix is the prefix for all Redis keys (default: "featureflag:")
	KeyPrefix string
	// CacheTTL is the local cache TTL (default: 30 seconds)
	CacheTTL time.Duration
	// Environment is the current environment (default: dev)
	Environment Environment
}

FeatureFlagSetConfig configures the feature flag set

func DefaultFeatureFlagSetConfig ΒΆ

func DefaultFeatureFlagSetConfig() *FeatureFlagSetConfig

DefaultFeatureFlagSetConfig returns default configuration

type Func ΒΆ

type Func struct {
	Name     string
	FuncName interface{}
	FuncArgs []interface{}
	// contains filtered or unexported fields
}

func NewFunc ΒΆ

func NewFunc(f Func) *Func

type Generator ΒΆ

type Generator[T any] struct {
	// contains filtered or unexported fields
}

Generator produces values on demand

func NewGenerator ΒΆ

func NewGenerator[T any](ctx context.Context, bufferSize int, producer func(context.Context, chan<- T)) *Generator[T]

NewGenerator creates a generator with a producer function

func (*Generator[T]) Channel ΒΆ

func (g *Generator[T]) Channel() <-chan T

Channel returns the underlying channel for range operations

func (*Generator[T]) Close ΒΆ

func (g *Generator[T]) Close()

Close stops the generator

func (*Generator[T]) Collect ΒΆ

func (g *Generator[T]) Collect() []T

Collect collects all remaining values into a slice

func (*Generator[T]) CollectN ΒΆ

func (g *Generator[T]) CollectN(n int) []T

CollectN collects up to n values

func (*Generator[T]) Next ΒΆ

func (g *Generator[T]) Next() (T, bool)

Next retrieves the next value from the generator

type GoManager ΒΆ

type GoManager struct {
	// contains filtered or unexported fields
}

func NewGoManager ΒΆ

func NewGoManager() *GoManager

func (*GoManager) AddCancelFunc ΒΆ

func (GR *GoManager) AddCancelFunc(name string, cancelFunc context.CancelFunc)

func (*GoManager) Cancel ΒΆ

func (GR *GoManager) Cancel(name string)

func (*GoManager) GO ΒΆ

func (GR *GoManager) GO(name string, fn interface{}, argv ...interface{})

func (*GoManager) Poll ΒΆ

func (gr *GoManager) Poll(f ...Func)

type Group ΒΆ

type Group struct {
	// contains filtered or unexported fields
}

func NewGroup ΒΆ

func NewGroup() *Group

func (*Group) Assign ΒΆ

func (g *Group) Assign(result *any, fn func() any)

func (*Group) Resolve ΒΆ

func (g *Group) Resolve()

func (*Group) ResolveWithTimeout ΒΆ

func (g *Group) ResolveWithTimeout(timeout time.Duration) bool

type Iterator ΒΆ

type Iterator[T any] interface {
	Next() bool
	Value() T
}

Iterator represents a sequence of values that can be traversed.

type KeyFunc ΒΆ

type KeyFunc[P any] func(params P) string

KeyFunc is a function that generates a cache key from parameters

func FormatKeyFunc ΒΆ

func FormatKeyFunc[P any](format string) KeyFunc[P]

FormatKeyFunc creates a KeyFunc using a format string Example: FormatKeyFunc[int]("user:%d") generates keys like "user:123"

func SimpleKeyFunc ΒΆ

func SimpleKeyFunc[P any](fn func(P) string) KeyFunc[P]

SimpleKeyFunc creates a KeyFunc from a simple function Useful for common key generation patterns

func StringKeyFunc ΒΆ

func StringKeyFunc() KeyFunc[string]

StringKeyFunc creates a KeyFunc that uses the parameter directly as a string key Useful when the parameter is already a string identifier

type ParametricFetcher ΒΆ

type ParametricFetcher[P any, T any] struct {
	// contains filtered or unexported fields
}

ParametricFetcher provides cache-first fetching with automatic key generation from parameters This simplifies the API by eliminating manual key construction

func NewParametricFetcher ΒΆ

func NewParametricFetcher[P any, T any](
	fetchFunc func(ctx context.Context, params P) (T, error),
	keyFunc KeyFunc[P],
	control *CacheControl,
) *ParametricFetcher[P, T]

NewParametricFetcher creates a fetcher with automatic key generation from parameters The keyFunc generates cache keys from parameters automatically If keyFunc is nil, uses fmt.Sprintf("%v", params) as default

func (*ParametricFetcher[P, T]) ClearCache ΒΆ

func (pf *ParametricFetcher[P, T]) ClearCache()

ClearCache clears all cached entries

func (*ParametricFetcher[P, T]) Fetch ΒΆ

func (pf *ParametricFetcher[P, T]) Fetch(ctx context.Context, params P) (T, error)

Fetch retrieves data with automatic key generation from parameters The cache key is automatically generated using the keyFunc

func (*ParametricFetcher[P, T]) FetchStaleWhileRevalidate ΒΆ

func (pf *ParametricFetcher[P, T]) FetchStaleWhileRevalidate(ctx context.Context, params P) (T, error)

FetchStaleWhileRevalidate returns stale data immediately while revalidating in background

func (*ParametricFetcher[P, T]) SetCacheControl ΒΆ

func (pf *ParametricFetcher[P, T]) SetCacheControl(control *CacheControl)

SetCacheControl updates the cache control directives

type Pipeline ΒΆ

type Pipeline[T any] struct {
	// contains filtered or unexported fields
}

Pipeline represents a stage in a processing pipeline

func NewPipeline ΒΆ

func NewPipeline[T any]() *Pipeline[T]

NewPipeline creates a new pipeline

func (*Pipeline[T]) AddStage ΒΆ

func (p *Pipeline[T]) AddStage(stage func(T) T) *Pipeline[T]

AddStage adds a processing stage to the pipeline

func (*Pipeline[T]) Execute ΒΆ

func (p *Pipeline[T]) Execute(item T) T

Execute runs the pipeline on a single item

func (*Pipeline[T]) ExecuteAsync ΒΆ

func (p *Pipeline[T]) ExecuteAsync(ctx context.Context, items []T) []T

ExecuteAsync processes items through the pipeline concurrently

type PreflightFetcher ΒΆ

type PreflightFetcher[T any] struct {
	// contains filtered or unexported fields
}

PreflightFetcher represents a data source with preflight cache check

func NewPreflightFetcher ΒΆ

func NewPreflightFetcher[T any](
	fetchFunc func(ctx context.Context, key string) (T, error),
	control *CacheControl,
) *PreflightFetcher[T]

NewPreflightFetcher creates a new preflight fetcher with caching

func (*PreflightFetcher[T]) ClearCache ΒΆ

func (pf *PreflightFetcher[T]) ClearCache()

ClearCache clears all cached entries

func (*PreflightFetcher[T]) Fetch ΒΆ

func (pf *PreflightFetcher[T]) Fetch(ctx context.Context, key string) (T, error)

Fetch retrieves data with preflight cache check This implements the cache-first pattern to reduce downstream load

func (*PreflightFetcher[T]) FetchStaleWhileRevalidate ΒΆ

func (pf *PreflightFetcher[T]) FetchStaleWhileRevalidate(ctx context.Context, key string) (T, error)

FetchStaleWhileRevalidate returns stale data immediately while revalidating in background

func (*PreflightFetcher[T]) SetCacheControl ΒΆ

func (pf *PreflightFetcher[T]) SetCacheControl(control *CacheControl)

SetCacheControl updates the cache control directives

type RateLimiter ΒΆ

type RateLimiter struct {
	// contains filtered or unexported fields
}

RateLimiter controls the rate of operations

func NewRateLimiter ΒΆ

func NewRateLimiter(rate int) *RateLimiter

NewRateLimiter creates a rate limiter with specified rate (operations per second)

func (*RateLimiter) Stop ΒΆ

func (rl *RateLimiter) Stop()

Stop stops the rate limiter

func (*RateLimiter) TryAcquire ΒΆ

func (rl *RateLimiter) TryAcquire() bool

TryAcquire attempts to acquire a token without blocking

func (*RateLimiter) Wait ΒΆ

func (rl *RateLimiter) Wait(ctx context.Context) error

Wait blocks until a token is available

type RecastOptions ΒΆ

type RecastOptions struct {
	// TransformFuncs maps field names to transformation functions
	TransformFuncs map[string]TransformFunc
	// OmitFields lists fields to exclude from output (supports negation)
	OmitFields []string
	// RenameFields maps source field names to destination field names
	RenameFields map[string]string
}

RecastOptions holds configuration for recasting operations

type RetryPolicy ΒΆ

type RetryPolicy struct {
	MaxRetries    int
	RetryDelay    time.Duration
	BackoffFactor float64
	MaxRetryDelay time.Duration
}

RetryPolicy defines how tasks should be retried on failure

func DefaultRetryPolicy ΒΆ

func DefaultRetryPolicy() *RetryPolicy

DefaultRetryPolicy returns a default retry policy

type RolloutConfig ΒΆ

type RolloutConfig struct {
	// Policy is the rollout strategy to use
	Policy RolloutPolicy `json:"policy"`
	// Percentage is the percentage of users to enable (0-100) for gradual rollout
	Percentage int `json:"percentage,omitempty"`
	// TargetUserIDs is the list of specific user IDs for targeted rollout
	TargetUserIDs []string `json:"target_user_ids,omitempty"`
	// CanarySegments is the list of segments for canary rollout (e.g., "beta_users", "internal")
	CanarySegments []string `json:"canary_segments,omitempty"`
}

RolloutConfig contains rollout policy configuration

type RolloutPolicy ΒΆ

type RolloutPolicy string

RolloutPolicy defines how a feature is rolled out

const (
	// RolloutAllAtOnce enables the feature for all users immediately
	RolloutAllAtOnce RolloutPolicy = "all_at_once"
	// RolloutGradual enables the feature gradually using percentage
	RolloutGradual RolloutPolicy = "gradual"
	// RolloutCanary enables the feature for specific user segments first
	RolloutCanary RolloutPolicy = "canary"
	// RolloutTargeted enables the feature for specific user IDs only
	RolloutTargeted RolloutPolicy = "targeted"
)

type SafeChannel ΒΆ

type SafeChannel[T any] struct {
	// contains filtered or unexported fields
}

SafeChannel provides a thread-safe wrapper around a channel with timeout capabilities

func NewSafeChannel ΒΆ

func NewSafeChannel[T any](bufferSize int, defaultTimeout time.Duration) *SafeChannel[T]

NewSafeChannel creates a new SafeChannel with the specified buffer size and default timeout

func (*SafeChannel[T]) Cap ΒΆ

func (sc *SafeChannel[T]) Cap() int

Cap returns the capacity of the channel

func (*SafeChannel[T]) Close ΒΆ

func (sc *SafeChannel[T]) Close()

Close safely closes the channel

func (*SafeChannel[T]) IsClosed ΒΆ

func (sc *SafeChannel[T]) IsClosed() bool

IsClosed checks if the channel is closed

func (*SafeChannel[T]) Len ΒΆ

func (sc *SafeChannel[T]) Len() int

Len returns the current number of items in the channel

func (*SafeChannel[T]) Receive ΒΆ

func (sc *SafeChannel[T]) Receive(ctx context.Context) (T, error)

Receive attempts to receive a value from the channel with timeout

func (*SafeChannel[T]) Send ΒΆ

func (sc *SafeChannel[T]) Send(ctx context.Context, value T) error

Send attempts to send a value to the channel with timeout

func (*SafeChannel[T]) TryReceive ΒΆ

func (sc *SafeChannel[T]) TryReceive() (T, error)

TryReceive attempts to receive a value without blocking

func (*SafeChannel[T]) TrySend ΒΆ

func (sc *SafeChannel[T]) TrySend(value T) error

TrySend attempts to send a value without blocking

type Semaphore ΒΆ

type Semaphore struct {
	// contains filtered or unexported fields
}

Semaphore controls concurrent access to a resource

func NewSemaphore ΒΆ

func NewSemaphore(permits int) *Semaphore

NewSemaphore creates a semaphore with specified number of permits

func (*Semaphore) Acquire ΒΆ

func (s *Semaphore) Acquire(ctx context.Context) error

Acquire acquires a permit, blocking if none available

func (*Semaphore) AcquireAll ΒΆ

func (s *Semaphore) AcquireAll(ctx context.Context) error

AcquireAll acquires all permits

func (*Semaphore) Release ΒΆ

func (s *Semaphore) Release()

Release releases a permit

func (*Semaphore) ReleaseAll ΒΆ

func (s *Semaphore) ReleaseAll()

ReleaseAll releases all acquired permits

func (*Semaphore) TryAcquire ΒΆ

func (s *Semaphore) TryAcquire() bool

TryAcquire attempts to acquire a permit without blocking

type Stream ΒΆ

type Stream[T any] struct {
	// contains filtered or unexported fields
}

Stream represents a sequence of values with chainable operations. We need to add U to the type parameter list of Stream itself

func FromSlice ΒΆ

func FromSlice[T any](slice []T) *Stream[T]

FromSlice creates a new Stream from a slice

func Map ΒΆ

func Map[T, U any](s *Stream[T], mapper func(T) U) *Stream[U]

Map converts a Stream[T] to a Stream[U] Instead of being a method with a type parameter, this is now a function

func (*Stream[T]) Collect ΒΆ

func (s *Stream[T]) Collect() []T

Collect accumulates all elements into a slice

func (*Stream[T]) Filter ΒΆ

func (s *Stream[T]) Filter(predicate func(T) bool) *Stream[T]

Filter retains only elements that satisfy the given predicate

func (*Stream[T]) ForEach ΒΆ

func (s *Stream[T]) ForEach(action func(T))

ForEach applies an action to each element

func (*Stream[T]) Reduce ΒΆ

func (s *Stream[T]) Reduce(initial T, reducer func(T, T) T) T

Reduce combines elements using a reducer function and initial value

type SuperSlice ΒΆ

type SuperSlice[T any] struct {
	// contains filtered or unexported fields
}

SuperSlice provides efficient slice processing with configurable parallelization

func NewSuperSlice ΒΆ

func NewSuperSlice[T any](data []T) *SuperSlice[T]

NewSuperSlice creates a new SuperSlice with default configuration

func NewSuperSliceWithConfig ΒΆ

func NewSuperSliceWithConfig[T any](data []T, config *SuperSliceConfig) *SuperSlice[T]

NewSuperSliceWithConfig creates a new SuperSlice with custom configuration

func (*SuperSlice[T]) FilterSlice ΒΆ

func (ss *SuperSlice[T]) FilterSlice(predicate func(index int, item T) bool) []T

Filter returns a new slice with elements that satisfy the predicate

func (*SuperSlice[T]) ForEach ΒΆ

func (ss *SuperSlice[T]) ForEach(callback func(index int, item T))

ForEach applies a callback function to each element without returning results

func (*SuperSlice[T]) GetData ΒΆ

func (ss *SuperSlice[T]) GetData() []T

GetData returns the underlying slice

func (*SuperSlice[T]) Len ΒΆ

func (ss *SuperSlice[T]) Len() int

Len returns the length of the slice

func (*SuperSlice[T]) Process ΒΆ

func (ss *SuperSlice[T]) Process(callback func(index int, item T) T) []T

Process applies a callback function to each element Returns a new slice with the results (or modifies in place if configured)

func (*SuperSlice[T]) ProcessWithError ΒΆ

func (ss *SuperSlice[T]) ProcessWithError(callback func(index int, item T) (T, error)) ([]T, error)

ProcessWithError applies a callback that can return an error

func (*SuperSlice[T]) WithConfig ΒΆ

func (ss *SuperSlice[T]) WithConfig(config *SuperSliceConfig) *SuperSlice[T]

WithConfig sets a custom configuration

func (*SuperSlice[T]) WithInPlace ΒΆ

func (ss *SuperSlice[T]) WithInPlace() *SuperSlice[T]

WithInPlace enables in-place updates

func (*SuperSlice[T]) WithIterable ΒΆ

func (ss *SuperSlice[T]) WithIterable() *SuperSlice[T]

WithIterable enables iterable processing

func (*SuperSlice[T]) WithThreshold ΒΆ

func (ss *SuperSlice[T]) WithThreshold(threshold int) *SuperSlice[T]

WithThreshold sets a custom threshold

func (*SuperSlice[T]) WithWorkers ΒΆ

func (ss *SuperSlice[T]) WithWorkers(numWorkers int) *SuperSlice[T]

WithWorkers sets the number of workers

type SuperSliceConfig ΒΆ

type SuperSliceConfig struct {
	// Threshold determines when to switch to parallel processing (default: 1000)
	Threshold int
	// NumWorkers specifies the number of worker goroutines (default: NumCPU)
	NumWorkers int
	// UseIterable indicates whether to convert slice to iterable first (default: false)
	UseIterable bool
	// InPlace indicates whether to update the slice in place (default: false)
	InPlace bool
}

SuperSliceConfig holds configuration for SuperSlice processing

func DefaultSuperSliceConfig ΒΆ

func DefaultSuperSliceConfig() *SuperSliceConfig

DefaultSuperSliceConfig returns a configuration with sensible defaults

type SwissMap ΒΆ

type SwissMap[K comparable, V any] struct {
	// contains filtered or unexported fields
}

SwissMap is a high-performance, thread-safe generic map using sharded architecture for optimal concurrent access. It divides the map into multiple shards, each protected by its own mutex, reducing lock contention in high-concurrency scenarios.

func NewSwissMap ΒΆ

func NewSwissMap[K comparable, V any]() *SwissMap[K, V]

NewSwissMap creates a new SwissMap with default shard count (32) This provides excellent performance for most concurrent workloads

func NewSwissMapWithShards ΒΆ

func NewSwissMapWithShards[K comparable, V any](shardCount uint32) *SwissMap[K, V]

NewSwissMapWithShards creates a new SwissMap with specified shard count The shard count must be a power of 2 for optimal performance Higher shard counts reduce lock contention but increase memory overhead

func (*SwissMap[K, V]) Clear ΒΆ

func (sm *SwissMap[K, V]) Clear()

Clear removes all entries from the map

func (*SwissMap[K, V]) Delete ΒΆ

func (sm *SwissMap[K, V]) Delete(key K)

Delete removes a key from the map

func (*SwissMap[K, V]) Get ΒΆ

func (sm *SwissMap[K, V]) Get(key K) (V, bool)

Get retrieves a value from the map Returns the value and true if found, zero value and false otherwise

func (*SwissMap[K, V]) GetOrCompute ΒΆ

func (sm *SwissMap[K, V]) GetOrCompute(key K, compute func() V) V

GetOrCompute retrieves a value or computes and sets it if not present The compute function is only called if the key doesn't exist

func (*SwissMap[K, V]) GetOrSet ΒΆ

func (sm *SwissMap[K, V]) GetOrSet(key K, value V) (V, bool)

GetOrSet retrieves a value or sets it if not present Returns the value (existing or newly set) and true if it was already present

func (*SwissMap[K, V]) Has ΒΆ

func (sm *SwissMap[K, V]) Has(key K) bool

Has checks if a key exists in the map

func (*SwissMap[K, V]) Keys ΒΆ

func (sm *SwissMap[K, V]) Keys() []K

Keys returns all keys in the map

func (*SwissMap[K, V]) Len ΒΆ

func (sm *SwissMap[K, V]) Len() int

Len returns the total number of entries in the map

func (*SwissMap[K, V]) Range ΒΆ

func (sm *SwissMap[K, V]) Range(f func(key K, value V) bool)

Range iterates over all key-value pairs in the map The function f is called for each entry. If f returns false, iteration stops

func (*SwissMap[K, V]) Set ΒΆ

func (sm *SwissMap[K, V]) Set(key K, value V)

Set stores a key-value pair in the map

func (*SwissMap[K, V]) ToMap ΒΆ

func (sm *SwissMap[K, V]) ToMap() map[K]V

ToMap converts the SwissMap to a regular Go map This creates a snapshot of the current state

func (*SwissMap[K, V]) Values ΒΆ

func (sm *SwissMap[K, V]) Values() []V

Values returns all values in the map

type Task ΒΆ

type Task[T any] struct {
	// contains filtered or unexported fields
}

type TaskFunc ΒΆ

type TaskFunc func(ctx context.Context) error

TaskFunc represents a function to be executed by workers

type TaskHandler ΒΆ

type TaskHandler func(ctx context.Context, args map[string]interface{}) error

TaskHandler is a registered function that can be retrieved by name

func GetTaskHandler ΒΆ

func GetTaskHandler(name string) (TaskHandler, error)

GetTaskHandler retrieves a registered task handler by name

type TaskPriority ΒΆ

type TaskPriority int

TaskPriority defines task priority levels

const (
	PriorityLow TaskPriority = iota
	PriorityNormal
	PriorityHigh
	PriorityCritical
)

type TaskResult ΒΆ

type TaskResult struct {
	TaskID    string
	Status    TaskStatus
	Result    interface{}
	Error     error
	StartTime time.Time
	EndTime   time.Time
	Attempts  int
}

TaskResult stores the result of task execution

type TaskStatus ΒΆ

type TaskStatus string

TaskStatus represents the execution status of a task

const (
	StatusPending  TaskStatus = "pending"
	StatusRunning  TaskStatus = "running"
	StatusSuccess  TaskStatus = "success"
	StatusFailed   TaskStatus = "failed"
	StatusRetrying TaskStatus = "retrying"
	StatusExpired  TaskStatus = "expired"
)

type TransformFunc ΒΆ

type TransformFunc func(interface{}) interface{}

TransformFunc is a function type for custom field transformations

type WorkerPool ΒΆ

type WorkerPool struct {
	// contains filtered or unexported fields
}

WorkerPool manages a pool of workers that execute tasks

func NewWorkerPool ΒΆ

func NewWorkerPool(numWorkers int) *WorkerPool

NewWorkerPool creates a new worker pool with the specified number of workers

func NewWorkerPoolWithConfig ΒΆ

func NewWorkerPoolWithConfig(config *WorkerPoolConfig) *WorkerPool

NewWorkerPoolWithConfig creates a new worker pool with custom configuration

func (*WorkerPool) AckTask ΒΆ

func (wp *WorkerPool) AckTask(taskID string)

AckTask acknowledges task completion (Celery mode)

func (*WorkerPool) CancelCron ΒΆ

func (wp *WorkerPool) CancelCron(taskID string)

CancelCron cancels a recurring cron task

func (*WorkerPool) ClearResults ΒΆ

func (wp *WorkerPool) ClearResults()

ClearResults clears all stored task results

func (*WorkerPool) GetDeadLetterTasks ΒΆ

func (wp *WorkerPool) GetDeadLetterTasks() []*WorkerTask

GetDeadLetterTasks returns tasks that failed after max retries (Resque mode)

func (*WorkerPool) GetQueueLength ΒΆ

func (wp *WorkerPool) GetQueueLength(queueName string) int

GetQueueLength returns the number of pending tasks in a queue (Celery mode)

func (*WorkerPool) GetResult ΒΆ

func (wp *WorkerPool) GetResult(taskID string) (*TaskResult, bool)

GetResult retrieves the result of a task (both modes)

func (*WorkerPool) IsRunning ΒΆ

func (wp *WorkerPool) IsRunning() bool

IsRunning returns whether the worker pool is running

func (*WorkerPool) RejectTask ΒΆ

func (wp *WorkerPool) RejectTask(taskID string, requeue bool) error

RejectTask rejects a task and optionally requeues it (Celery mode)

func (*WorkerPool) RequeueDeadLetter ΒΆ

func (wp *WorkerPool) RequeueDeadLetter(taskID string) error

RequeueDeadLetter requeues a task from dead letter queue (Resque mode)

func (*WorkerPool) Start ΒΆ

func (wp *WorkerPool) Start()

Start starts the worker pool

func (*WorkerPool) Stop ΒΆ

func (wp *WorkerPool) Stop()

Stop stops the worker pool and waits for all workers to finish

func (*WorkerPool) Submit ΒΆ

func (wp *WorkerPool) Submit(task *WorkerTask) error

Submit submits a task for immediate execution

func (*WorkerPool) SubmitCron ΒΆ

func (wp *WorkerPool) SubmitCron(task *WorkerTask, cronExpr string) error

SubmitCron submits a recurring task with a cron schedule

func (*WorkerPool) SubmitDelayed ΒΆ

func (wp *WorkerPool) SubmitDelayed(task *WorkerTask, delay time.Duration) error

SubmitDelayed submits a task for delayed execution

func (*WorkerPool) SubmitToQueue ΒΆ

func (wp *WorkerPool) SubmitToQueue(queueName string, task *WorkerTask) error

SubmitToQueue submits a task to a named queue (Celery mode)

func (*WorkerPool) WorkerCount ΒΆ

func (wp *WorkerPool) WorkerCount() int

WorkerCount returns the number of workers in the pool

type WorkerPoolConfig ΒΆ

type WorkerPoolConfig struct {
	NumWorkers     int
	MaxDeadLetter  int
	EnableQueues   bool // Enable named queues (Celery mode)
	EnableResults  bool // Enable result storage (both modes)
	OnTaskStart    func(task *WorkerTask)
	OnTaskComplete func(task *WorkerTask, result *TaskResult)
	OnTaskFailed   func(task *WorkerTask, err error)
}

WorkerPoolConfig configures the worker pool

func DefaultWorkerPoolConfig ΒΆ

func DefaultWorkerPoolConfig() *WorkerPoolConfig

DefaultWorkerPoolConfig returns default configuration

type WorkerTask ΒΆ

type WorkerTask struct {
	ID       string
	Func     TaskFunc
	Delay    time.Duration
	CronExpr string

	IsRecurring bool

	// Resque/Celery compatibility fields
	Priority    TaskPriority
	Queue       string
	RetryPolicy *RetryPolicy
	Timeout     time.Duration
	ExpiresAt   *time.Time
	Args        map[string]interface{}

	// Broker serialization fields
	HandlerName string // Name of registered handler for broker storage
	// contains filtered or unexported fields
}

WorkerTask represents a unit of work with optional delay and cron schedule

func DecodeFromBroker ΒΆ

func DecodeFromBroker(data []byte) (*WorkerTask, error)

DecodeFromBroker decodes a task from broker JSON format

func (*WorkerTask) EncodeToBroker ΒΆ

func (wt *WorkerTask) EncodeToBroker() ([]byte, error)

EncodeToBroker encodes the task to broker-compatible JSON format

func (*WorkerTask) GetAttempts ΒΆ

func (wt *WorkerTask) GetAttempts() int

GetAttempts returns the number of execution attempts

func (*WorkerTask) GetStatus ΒΆ

func (wt *WorkerTask) GetStatus() TaskStatus

GetStatus returns the current task status

func (*WorkerTask) MarshalJSON ΒΆ

func (wt *WorkerTask) MarshalJSON() ([]byte, error)

MarshalJSON serializes task metadata (Celery compatibility)

Directories ΒΆ

Path Synopsis
swissmap_demo command
worker_broker command
worker_demo command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL