Documentation
ΒΆ
Overview ΒΆ
Package goroutine provides advanced concurrent processing utilities for Go.
This package offers powerful tools for managing concurrent operations including:
- Async task resolution (Promise-like pattern with Group)
- Safe channel operations with timeout support (SafeChannel)
- Parallel slice processing with automatic parallelization (SuperSlice)
- Flexible goroutine management with cancellation (GoManager)
- Type conversion utilities (Recasting)
Key Features:
Async Resolve: Launch multiple async operations and wait for all to complete, similar to Promise.all() in JavaScript. Supports timeout-based operations.
SuperSlice: Process large slices efficiently with automatic parallelization based on configurable thresholds. Includes worker pool management, in-place updates, and support for map/filter/forEach operations.
SafeChannel: Thread-safe channel wrapper with timeout capabilities and distributed backend support for building robust concurrent systems.
GoManager: Manage named goroutines with context-based lifecycle management and dynamic cancellation support.
Example usage:
// Async Resolve
group := goroutine.NewGroup()
var result1, result2 any
group.Assign(&result1, func() any { return "done" })
group.Assign(&result2, func() any { return 42 })
group.Resolve()
// SuperSlice
numbers := []int{1, 2, 3, 4, 5}
ss := goroutine.NewSuperSlice(numbers)
doubled := ss.Process(func(i int, n int) int { return n * 2 })
// SafeChannel
sc := goroutine.NewSafeChannel[int](10, 5*time.Second)
sc.Send(42)
value, _ := sc.Receive()
// GoManager
manager := goroutine.NewGoManager()
manager.GO("worker", func() { /* work */ })
manager.Cancel("worker")
Index ΒΆ
- Variables
- func MapTo[T, U any](ss *SuperSlice[T], mapper func(index int, item T) U) []U
- func Null()
- func RecastToJSON(src, dst interface{})
- func RecastToJSONBytes(src interface{}, opts *RecastOptions) ([]byte, error)
- func RecastToJSONString(src interface{}, opts *RecastOptions) (string, error)
- func RecastToJSONWithOptions(src, dst interface{}, opts *RecastOptions)
- func RecastToMap(src interface{}, opts *RecastOptions) map[string]interface{}
- func RegisterTaskHandler(name string, handler TaskHandler)
- type BrokerJobData
- type Cache
- type CacheControl
- type CacheEntry
- type CachedGroup
- type CircuitBreaker
- type CircuitState
- type ConcurrencyConfig
- type ConcurrencyWrapper
- type CronSchedule
- type DistributedBackend
- type DistributedSafeChannel
- func (dsc *DistributedSafeChannel[T]) BackendType() string
- func (dsc *DistributedSafeChannel[T]) ClearCache()
- func (dsc *DistributedSafeChannel[T]) Close() error
- func (dsc *DistributedSafeChannel[T]) IsClosed() bool
- func (dsc *DistributedSafeChannel[T]) Receive(ctx context.Context) (T, error)
- func (dsc *DistributedSafeChannel[T]) ReceiveWithPreflight(ctx context.Context, key string) (T, error)
- func (dsc *DistributedSafeChannel[T]) Send(ctx context.Context, value T) error
- func (dsc *DistributedSafeChannel[T]) SetCacheControl(control *CacheControl)
- func (dsc *DistributedSafeChannel[T]) Subscribe(ctx context.Context) error
- type Environment
- type FanOut
- type FeatureFlag
- type FeatureFlagSet
- func (ffs *FeatureFlagSet) ClearCache()
- func (ffs *FeatureFlagSet) Close() error
- func (ffs *FeatureFlagSet) CreateFlag(ctx context.Context, name string, enabled bool, description string) error
- func (ffs *FeatureFlagSet) DeleteFlag(ctx context.Context, flagName string) error
- func (ffs *FeatureFlagSet) GetEnvironment() Environment
- func (ffs *FeatureFlagSet) GetFlag(ctx context.Context, flagName string) (*FeatureFlag, error)
- func (ffs *FeatureFlagSet) IsEnabled(ctx context.Context, flagName string) (bool, error)
- func (ffs *FeatureFlagSet) IsEnabledForEnv(ctx context.Context, flagName string, env Environment) (bool, error)
- func (ffs *FeatureFlagSet) IsEnabledForUser(ctx context.Context, flagName string, userID string, userSegments []string) (bool, error)
- func (ffs *FeatureFlagSet) IsEnabledForUserInEnv(ctx context.Context, flagName string, env Environment, userID string, ...) (bool, error)
- func (ffs *FeatureFlagSet) ListFlags(ctx context.Context) ([]*FeatureFlag, error)
- func (ffs *FeatureFlagSet) SetAllAtOnceRollout(ctx context.Context, name string) error
- func (ffs *FeatureFlagSet) SetCanaryRollout(ctx context.Context, name string, segments []string) error
- func (ffs *FeatureFlagSet) SetEnvironment(env Environment)
- func (ffs *FeatureFlagSet) SetFlag(ctx context.Context, flag *FeatureFlag) error
- func (ffs *FeatureFlagSet) SetFlagForEnv(ctx context.Context, name string, env Environment, enabled bool) error
- func (ffs *FeatureFlagSet) SetGradualRollout(ctx context.Context, name string, percentage int) error
- func (ffs *FeatureFlagSet) SetRolloutPolicy(ctx context.Context, name string, rollout *RolloutConfig) error
- func (ffs *FeatureFlagSet) SetTargetedRollout(ctx context.Context, name string, userIDs []string) error
- func (ffs *FeatureFlagSet) UpdateFlag(ctx context.Context, name string, enabled bool) error
- type FeatureFlagSetConfig
- type Func
- type Generator
- type GoManager
- type Group
- type Iterator
- type KeyFunc
- type ParametricFetcher
- func (pf *ParametricFetcher[P, T]) ClearCache()
- func (pf *ParametricFetcher[P, T]) Fetch(ctx context.Context, params P) (T, error)
- func (pf *ParametricFetcher[P, T]) FetchStaleWhileRevalidate(ctx context.Context, params P) (T, error)
- func (pf *ParametricFetcher[P, T]) SetCacheControl(control *CacheControl)
- type Pipeline
- type PreflightFetcher
- type RateLimiter
- type RecastOptions
- type RetryPolicy
- type RolloutConfig
- type RolloutPolicy
- type SafeChannel
- func (sc *SafeChannel[T]) Cap() int
- func (sc *SafeChannel[T]) Close()
- func (sc *SafeChannel[T]) IsClosed() bool
- func (sc *SafeChannel[T]) Len() int
- func (sc *SafeChannel[T]) Receive(ctx context.Context) (T, error)
- func (sc *SafeChannel[T]) Send(ctx context.Context, value T) error
- func (sc *SafeChannel[T]) TryReceive() (T, error)
- func (sc *SafeChannel[T]) TrySend(value T) error
- type Semaphore
- type Stream
- type SuperSlice
- func (ss *SuperSlice[T]) FilterSlice(predicate func(index int, item T) bool) []T
- func (ss *SuperSlice[T]) ForEach(callback func(index int, item T))
- func (ss *SuperSlice[T]) GetData() []T
- func (ss *SuperSlice[T]) Len() int
- func (ss *SuperSlice[T]) Process(callback func(index int, item T) T) []T
- func (ss *SuperSlice[T]) ProcessWithError(callback func(index int, item T) (T, error)) ([]T, error)
- func (ss *SuperSlice[T]) WithConfig(config *SuperSliceConfig) *SuperSlice[T]
- func (ss *SuperSlice[T]) WithInPlace() *SuperSlice[T]
- func (ss *SuperSlice[T]) WithIterable() *SuperSlice[T]
- func (ss *SuperSlice[T]) WithThreshold(threshold int) *SuperSlice[T]
- func (ss *SuperSlice[T]) WithWorkers(numWorkers int) *SuperSlice[T]
- type SuperSliceConfig
- type SwissMap
- func (sm *SwissMap[K, V]) Clear()
- func (sm *SwissMap[K, V]) Delete(key K)
- func (sm *SwissMap[K, V]) Get(key K) (V, bool)
- func (sm *SwissMap[K, V]) GetOrCompute(key K, compute func() V) V
- func (sm *SwissMap[K, V]) GetOrSet(key K, value V) (V, bool)
- func (sm *SwissMap[K, V]) Has(key K) bool
- func (sm *SwissMap[K, V]) Keys() []K
- func (sm *SwissMap[K, V]) Len() int
- func (sm *SwissMap[K, V]) Range(f func(key K, value V) bool)
- func (sm *SwissMap[K, V]) Set(key K, value V)
- func (sm *SwissMap[K, V]) ToMap() map[K]V
- func (sm *SwissMap[K, V]) Values() []V
- type Task
- type TaskFunc
- type TaskHandler
- type TaskPriority
- type TaskResult
- type TaskStatus
- type TransformFunc
- type WorkerPool
- func (wp *WorkerPool) AckTask(taskID string)
- func (wp *WorkerPool) CancelCron(taskID string)
- func (wp *WorkerPool) ClearResults()
- func (wp *WorkerPool) GetDeadLetterTasks() []*WorkerTask
- func (wp *WorkerPool) GetQueueLength(queueName string) int
- func (wp *WorkerPool) GetResult(taskID string) (*TaskResult, bool)
- func (wp *WorkerPool) IsRunning() bool
- func (wp *WorkerPool) RejectTask(taskID string, requeue bool) error
- func (wp *WorkerPool) RequeueDeadLetter(taskID string) error
- func (wp *WorkerPool) Start()
- func (wp *WorkerPool) Stop()
- func (wp *WorkerPool) Submit(task *WorkerTask) error
- func (wp *WorkerPool) SubmitCron(task *WorkerTask, cronExpr string) error
- func (wp *WorkerPool) SubmitDelayed(task *WorkerTask, delay time.Duration) error
- func (wp *WorkerPool) SubmitToQueue(queueName string, task *WorkerTask) error
- func (wp *WorkerPool) WorkerCount() int
- type WorkerPoolConfig
- type WorkerTask
Constants ΒΆ
This section is empty.
Variables ΒΆ
var ( ErrChannelClosed = errors.New("channel is closed") ErrTimeout = errors.New("operation timed out") ErrBackendFailed = errors.New("backend operation failed") ErrInvalidBackend = errors.New("invalid backend") ErrNotImplemented = errors.New("not implemented") )
var ( ErrRetryExhausted = errors.New("retry attempts exhausted") ErrContextCanceled = errors.New("context canceled") )
Functions ΒΆ
func MapTo ΒΆ
func MapTo[T, U any](ss *SuperSlice[T], mapper func(index int, item T) U) []U
MapTo transforms slice elements to a different type
func RecastToJSON ΒΆ
func RecastToJSON(src, dst interface{})
RecastToJSON maps source struct fields to destination struct using recast and json tags
func RecastToJSONBytes ΒΆ
func RecastToJSONBytes(src interface{}, opts *RecastOptions) ([]byte, error)
RecastToJSONBytes converts a struct to JSON bytes with field mapping and transformations
func RecastToJSONString ΒΆ
func RecastToJSONString(src interface{}, opts *RecastOptions) (string, error)
RecastToJSONString converts a struct to JSON string with field mapping and transformations
func RecastToJSONWithOptions ΒΆ
func RecastToJSONWithOptions(src, dst interface{}, opts *RecastOptions)
RecastToJSONWithOptions provides advanced struct-to-struct mapping with transformations
func RecastToMap ΒΆ
func RecastToMap(src interface{}, opts *RecastOptions) map[string]interface{}
RecastToMap converts a struct to a map[string]interface{} with field mapping and transformations
func RegisterTaskHandler ΒΆ
func RegisterTaskHandler(name string, handler TaskHandler)
RegisterTaskHandler registers a task handler with a name for broker serialization
Types ΒΆ
type BrokerJobData ΒΆ
type BrokerJobData struct {
ID string `json:"id"`
HandlerName string `json:"handler_name"`
Queue string `json:"queue"`
Priority TaskPriority `json:"priority"`
Args map[string]interface{} `json:"args"`
RetryPolicy *RetryPolicy `json:"retry_policy,omitempty"`
Timeout int64 `json:"timeout,omitempty"` // Milliseconds
Delay int64 `json:"delay,omitempty"` // Milliseconds
ExpiresAt *time.Time `json:"expires_at,omitempty"`
CronExpr string `json:"cron_expr,omitempty"`
IsRecurring bool `json:"is_recurring,omitempty"`
CreatedAt time.Time `json:"created_at"`
// Resque compatibility
Class string `json:"class,omitempty"` // For Resque: task class name
// Celery compatibility
Task string `json:"task,omitempty"` // For Celery: task name
Kwargs map[string]interface{} `json:"kwargs,omitempty"` // For Celery: keyword arguments
ETA *time.Time `json:"eta,omitempty"` // For Celery: estimated time of arrival
Expires *time.Time `json:"expires,omitempty"` // For Celery: expiration time
}
BrokerJobData represents the job data structure for broker storage (Resque/Celery compatible)
type Cache ΒΆ
type Cache[K comparable, V any] struct { // contains filtered or unexported fields }
Cache provides a simple in-memory cache with TTL support Now uses SwissMap for improved concurrent performance
func NewCache ΒΆ
func NewCache[K comparable, V any]() *Cache[K, V]
NewCache creates a new cache instance
func (*Cache[K, V]) Cleanup ΒΆ
func (c *Cache[K, V]) Cleanup()
Cleanup removes expired entries from the cache
func (*Cache[K, V]) Delete ΒΆ
func (c *Cache[K, V]) Delete(key K)
Delete removes a value from the cache
func (*Cache[K, V]) Get ΒΆ
func (c *Cache[K, V]) Get(key K) (*CacheEntry[V], bool)
Get retrieves a value from the cache
type CacheControl ΒΆ
type CacheControl struct {
// NoCache forces fetch from source, bypassing cache
NoCache bool
// MaxAge defines how long cached data is valid
MaxAge time.Duration
// StaleWhileRevalidate allows stale data while fetching fresh data
StaleWhileRevalidate bool
}
CacheControl represents cache control directives
func DefaultCacheControl ΒΆ
func DefaultCacheControl() *CacheControl
DefaultCacheControl returns a cache control with sensible defaults
type CacheEntry ΒΆ
CacheEntry represents a cached value with metadata
func (*CacheEntry[T]) IsExpired ΒΆ
func (ce *CacheEntry[T]) IsExpired() bool
IsExpired checks if the cache entry has expired
func (*CacheEntry[T]) IsStale ΒΆ
func (ce *CacheEntry[T]) IsStale() bool
IsStale checks if the cache entry is stale but still usable
type CachedGroup ΒΆ
type CachedGroup struct {
// contains filtered or unexported fields
}
CachedGroup wraps Group with caching support for preflight checks
func NewCachedGroup ΒΆ
func NewCachedGroup() *CachedGroup
NewCachedGroup creates a new CachedGroup with integrated caching
func (*CachedGroup) AssignWithCache ΒΆ
func (cg *CachedGroup) AssignWithCache( key string, result *any, fn func() any, control *CacheControl, )
AssignWithCache assigns a task with cache-first preflight check If the cache contains a valid entry for the key, it uses that value Otherwise, it fetches from the provided function and caches the result
func (*CachedGroup) ClearCache ΒΆ
func (cg *CachedGroup) ClearCache()
ClearCache clears all cached entries
func (*CachedGroup) GetCache ΒΆ
func (cg *CachedGroup) GetCache() *Cache[string, any]
GetCache returns the underlying cache for direct access
func (*CachedGroup) Resolve ΒΆ
func (cg *CachedGroup) Resolve()
Resolve waits for all assigned tasks to complete
func (*CachedGroup) ResolveWithTimeout ΒΆ
func (cg *CachedGroup) ResolveWithTimeout(timeout time.Duration) bool
ResolveWithTimeout waits for tasks with a timeout
type CircuitBreaker ΒΆ
type CircuitBreaker struct {
// contains filtered or unexported fields
}
CircuitBreaker implements the circuit breaker pattern
func NewCircuitBreaker ΒΆ
func NewCircuitBreaker(threshold int) *CircuitBreaker
NewCircuitBreaker creates a new circuit breaker
func (*CircuitBreaker) Allow ΒΆ
func (cb *CircuitBreaker) Allow() bool
Allow checks if a request should be allowed
func (*CircuitBreaker) RecordFailure ΒΆ
func (cb *CircuitBreaker) RecordFailure()
RecordFailure records a failed operation
func (*CircuitBreaker) RecordSuccess ΒΆ
func (cb *CircuitBreaker) RecordSuccess()
RecordSuccess records a successful operation
func (*CircuitBreaker) State ΒΆ
func (cb *CircuitBreaker) State() CircuitState
State returns the current state
type CircuitState ΒΆ
type CircuitState int
CircuitState represents the state of the circuit breaker
const ( // CircuitClosed means requests are allowed CircuitClosed CircuitState = iota // CircuitOpen means requests are blocked CircuitOpen // CircuitHalfOpen means testing if service recovered CircuitHalfOpen )
type ConcurrencyConfig ΒΆ
type ConcurrencyConfig struct {
// MaxConcurrency limits the number of concurrent operations
MaxConcurrency int
// Timeout specifies the maximum time for operations
Timeout time.Duration
// RateLimit specifies operations per second (0 = no limit)
RateLimit int
// RetryAttempts specifies number of retry attempts on failure
RetryAttempts int
// RetryDelay specifies delay between retry attempts
RetryDelay time.Duration
// EnableCircuitBreaker enables circuit breaker pattern
EnableCircuitBreaker bool
// CircuitBreakerThreshold is the failure threshold before opening circuit
CircuitBreakerThreshold int
}
ConcurrencyConfig holds configuration for the smart wrapper
func DefaultConcurrencyConfig ΒΆ
func DefaultConcurrencyConfig() *ConcurrencyConfig
DefaultConcurrencyConfig returns default configuration
type ConcurrencyWrapper ΒΆ
type ConcurrencyWrapper[T, R any] struct { // contains filtered or unexported fields }
ConcurrencyWrapper provides a smart wrapper around concurrency patterns
func NewConcurrencyWrapper ΒΆ
func NewConcurrencyWrapper[T, R any](config *ConcurrencyConfig) *ConcurrencyWrapper[T, R]
NewConcurrencyWrapper creates a new concurrency wrapper with given config
func (*ConcurrencyWrapper[T, R]) Close ΒΆ
func (cw *ConcurrencyWrapper[T, R]) Close()
Close cleans up resources
type CronSchedule ΒΆ
type CronSchedule struct {
// contains filtered or unexported fields
}
CronSchedule parses and manages cron schedules
func NewCronSchedule ΒΆ
func NewCronSchedule(expr string) (*CronSchedule, error)
NewCronSchedule creates a new cron schedule from an expression Supported formats: - "@every 5s" - Run every 5 seconds - "@every 1m" - Run every 1 minute - "@every 1h" - Run every 1 hour - "@hourly" - Run every hour - "@daily" - Run every day at midnight
type DistributedBackend ΒΆ
type DistributedBackend interface {
Send(ctx context.Context, topic string, message []byte) error
Receive(ctx context.Context, topic string) ([]byte, error)
Subscribe(ctx context.Context, topic string, handler func([]byte) error) error
Close() error
Type() string
}
DistributedBackend interface for various message backends
type DistributedSafeChannel ΒΆ
type DistributedSafeChannel[T any] struct { // contains filtered or unexported fields }
DistributedSafeChannel wraps SafeChannel with distributed backend support It includes optional caching support to reduce downstream load via preflight checks
func NewDistributedSafeChannel ΒΆ
func NewDistributedSafeChannel[T any](backend DistributedBackend, topic string, bufferSize int, timeout time.Duration) *DistributedSafeChannel[T]
NewDistributedSafeChannel creates a distributed safe channel with specified backend
func (*DistributedSafeChannel[T]) BackendType ΒΆ
func (dsc *DistributedSafeChannel[T]) BackendType() string
BackendType returns the type of backend in use
func (*DistributedSafeChannel[T]) ClearCache ΒΆ
func (dsc *DistributedSafeChannel[T]) ClearCache()
ClearCache clears all cached entries
func (*DistributedSafeChannel[T]) Close ΒΆ
func (dsc *DistributedSafeChannel[T]) Close() error
Close closes both backend and local channel
func (*DistributedSafeChannel[T]) IsClosed ΒΆ
func (dsc *DistributedSafeChannel[T]) IsClosed() bool
IsClosed checks if the distributed channel is closed
func (*DistributedSafeChannel[T]) Receive ΒΆ
func (dsc *DistributedSafeChannel[T]) Receive(ctx context.Context) (T, error)
Receive deserializes and receives value from backend with fallback to local channel
func (*DistributedSafeChannel[T]) ReceiveWithPreflight ΒΆ
func (dsc *DistributedSafeChannel[T]) ReceiveWithPreflight(ctx context.Context, key string) (T, error)
ReceiveWithPreflight attempts to receive with cache preflight check This reduces downstream load by checking cache before backend
func (*DistributedSafeChannel[T]) Send ΒΆ
func (dsc *DistributedSafeChannel[T]) Send(ctx context.Context, value T) error
Send serializes and sends value through the backend with fallback to local channel
func (*DistributedSafeChannel[T]) SetCacheControl ΒΆ
func (dsc *DistributedSafeChannel[T]) SetCacheControl(control *CacheControl)
SetCacheControl updates the cache control directives for preflight checks
type Environment ΒΆ
type Environment string
Environment represents the deployment environment
const ( // EnvProduction represents production environment EnvProduction Environment = "prod" // EnvStaging represents staging environment EnvStaging Environment = "stage" // EnvDevelopment represents development environment EnvDevelopment Environment = "dev" )
type FanOut ΒΆ
type FanOut[T, R any] struct { // contains filtered or unexported fields }
FanOut distributes work across multiple workers and fans in results
type FeatureFlag ΒΆ
type FeatureFlag struct {
// Name is the unique identifier for the feature flag
Name string `json:"name"`
// Description describes what this flag controls
Description string `json:"description,omitempty"`
// Enabled is the global on/off switch for all environments
Enabled bool `json:"enabled"`
// Environments contains environment-specific overrides
Environments map[Environment]bool `json:"environments,omitempty"`
// Rollout contains rollout policy configuration
Rollout *RolloutConfig `json:"rollout,omitempty"`
// CreatedAt is when the flag was created
CreatedAt time.Time `json:"created_at"`
// UpdatedAt is when the flag was last updated
UpdatedAt time.Time `json:"updated_at"`
}
FeatureFlag represents a single feature flag with environment-specific settings
func (*FeatureFlag) IsEnabledForEnv ΒΆ
func (f *FeatureFlag) IsEnabledForEnv(env Environment) bool
IsEnabledForEnv checks if the flag is enabled for a specific environment
func (*FeatureFlag) IsEnabledForUser ΒΆ
func (f *FeatureFlag) IsEnabledForUser(env Environment, userID string, userSegments []string) bool
IsEnabledForUser checks if the flag is enabled for a specific user with rollout policy
type FeatureFlagSet ΒΆ
type FeatureFlagSet struct {
// contains filtered or unexported fields
}
FeatureFlagSet manages a collection of feature flags with Redis backend
func NewFeatureFlagSet ΒΆ
func NewFeatureFlagSet(config *FeatureFlagSetConfig) (*FeatureFlagSet, error)
NewFeatureFlagSet creates a new feature flag set with Redis backend
func NewFeatureFlagSetSimple ΒΆ
func NewFeatureFlagSetSimple(redisAddr string, env Environment) (*FeatureFlagSet, error)
NewFeatureFlagSetSimple creates a feature flag set with simple parameters
func (*FeatureFlagSet) ClearCache ΒΆ
func (ffs *FeatureFlagSet) ClearCache()
ClearCache clears the local cache
func (*FeatureFlagSet) Close ΒΆ
func (ffs *FeatureFlagSet) Close() error
Close closes the Redis connection
func (*FeatureFlagSet) CreateFlag ΒΆ
func (ffs *FeatureFlagSet) CreateFlag(ctx context.Context, name string, enabled bool, description string) error
CreateFlag creates a new feature flag with default settings
func (*FeatureFlagSet) DeleteFlag ΒΆ
func (ffs *FeatureFlagSet) DeleteFlag(ctx context.Context, flagName string) error
DeleteFlag removes a feature flag
func (*FeatureFlagSet) GetEnvironment ΒΆ
func (ffs *FeatureFlagSet) GetEnvironment() Environment
GetEnvironment returns the current environment
func (*FeatureFlagSet) GetFlag ΒΆ
func (ffs *FeatureFlagSet) GetFlag(ctx context.Context, flagName string) (*FeatureFlag, error)
GetFlag retrieves a feature flag by name
func (*FeatureFlagSet) IsEnabled ΒΆ
IsEnabled checks if a feature flag is enabled for the current environment
func (*FeatureFlagSet) IsEnabledForEnv ΒΆ
func (ffs *FeatureFlagSet) IsEnabledForEnv(ctx context.Context, flagName string, env Environment) (bool, error)
IsEnabledForEnv checks if a feature flag is enabled for a specific environment
func (*FeatureFlagSet) IsEnabledForUser ΒΆ
func (ffs *FeatureFlagSet) IsEnabledForUser(ctx context.Context, flagName string, userID string, userSegments []string) (bool, error)
IsEnabledForUser checks if a feature flag is enabled for a specific user with rollout policy
func (*FeatureFlagSet) IsEnabledForUserInEnv ΒΆ
func (ffs *FeatureFlagSet) IsEnabledForUserInEnv(ctx context.Context, flagName string, env Environment, userID string, userSegments []string) (bool, error)
IsEnabledForUserInEnv checks if a feature flag is enabled for a user in a specific environment
func (*FeatureFlagSet) ListFlags ΒΆ
func (ffs *FeatureFlagSet) ListFlags(ctx context.Context) ([]*FeatureFlag, error)
ListFlags returns all feature flags
func (*FeatureFlagSet) SetAllAtOnceRollout ΒΆ
func (ffs *FeatureFlagSet) SetAllAtOnceRollout(ctx context.Context, name string) error
SetAllAtOnceRollout configures an all-at-once rollout
func (*FeatureFlagSet) SetCanaryRollout ΒΆ
func (ffs *FeatureFlagSet) SetCanaryRollout(ctx context.Context, name string, segments []string) error
SetCanaryRollout configures a canary rollout with segments
func (*FeatureFlagSet) SetEnvironment ΒΆ
func (ffs *FeatureFlagSet) SetEnvironment(env Environment)
SetEnvironment updates the current environment
func (*FeatureFlagSet) SetFlag ΒΆ
func (ffs *FeatureFlagSet) SetFlag(ctx context.Context, flag *FeatureFlag) error
SetFlag creates or updates a feature flag
func (*FeatureFlagSet) SetFlagForEnv ΒΆ
func (ffs *FeatureFlagSet) SetFlagForEnv(ctx context.Context, name string, env Environment, enabled bool) error
SetFlagForEnv sets the enabled status for a specific environment
func (*FeatureFlagSet) SetGradualRollout ΒΆ
func (ffs *FeatureFlagSet) SetGradualRollout(ctx context.Context, name string, percentage int) error
SetGradualRollout configures a gradual rollout with percentage
func (*FeatureFlagSet) SetRolloutPolicy ΒΆ
func (ffs *FeatureFlagSet) SetRolloutPolicy(ctx context.Context, name string, rollout *RolloutConfig) error
SetRolloutPolicy sets the rollout policy for a feature flag
func (*FeatureFlagSet) SetTargetedRollout ΒΆ
func (ffs *FeatureFlagSet) SetTargetedRollout(ctx context.Context, name string, userIDs []string) error
SetTargetedRollout configures a targeted rollout with specific user IDs
func (*FeatureFlagSet) UpdateFlag ΒΆ
UpdateFlag updates an existing feature flag's enabled status
type FeatureFlagSetConfig ΒΆ
type FeatureFlagSetConfig struct {
// RedisAddr is the Redis server address (default: "localhost:6379")
RedisAddr string
// RedisPassword is the Redis password (optional)
RedisPassword string
// RedisDB is the Redis database number (default: 0)
RedisDB int
// KeyPrefix is the prefix for all Redis keys (default: "featureflag:")
KeyPrefix string
// CacheTTL is the local cache TTL (default: 30 seconds)
CacheTTL time.Duration
// Environment is the current environment (default: dev)
Environment Environment
}
FeatureFlagSetConfig configures the feature flag set
func DefaultFeatureFlagSetConfig ΒΆ
func DefaultFeatureFlagSetConfig() *FeatureFlagSetConfig
DefaultFeatureFlagSetConfig returns default configuration
type Func ΒΆ
type Func struct {
Name string
FuncName interface{}
FuncArgs []interface{}
// contains filtered or unexported fields
}
type Generator ΒΆ
type Generator[T any] struct { // contains filtered or unexported fields }
Generator produces values on demand
func NewGenerator ΒΆ
func NewGenerator[T any](ctx context.Context, bufferSize int, producer func(context.Context, chan<- T)) *Generator[T]
NewGenerator creates a generator with a producer function
func (*Generator[T]) Channel ΒΆ
func (g *Generator[T]) Channel() <-chan T
Channel returns the underlying channel for range operations
func (*Generator[T]) Collect ΒΆ
func (g *Generator[T]) Collect() []T
Collect collects all remaining values into a slice
type GoManager ΒΆ
type GoManager struct {
// contains filtered or unexported fields
}
func NewGoManager ΒΆ
func NewGoManager() *GoManager
func (*GoManager) AddCancelFunc ΒΆ
func (GR *GoManager) AddCancelFunc(name string, cancelFunc context.CancelFunc)
type KeyFunc ΒΆ
KeyFunc is a function that generates a cache key from parameters
func FormatKeyFunc ΒΆ
FormatKeyFunc creates a KeyFunc using a format string Example: FormatKeyFunc[int]("user:%d") generates keys like "user:123"
func SimpleKeyFunc ΒΆ
SimpleKeyFunc creates a KeyFunc from a simple function Useful for common key generation patterns
func StringKeyFunc ΒΆ
StringKeyFunc creates a KeyFunc that uses the parameter directly as a string key Useful when the parameter is already a string identifier
type ParametricFetcher ΒΆ
ParametricFetcher provides cache-first fetching with automatic key generation from parameters This simplifies the API by eliminating manual key construction
func NewParametricFetcher ΒΆ
func NewParametricFetcher[P any, T any]( fetchFunc func(ctx context.Context, params P) (T, error), keyFunc KeyFunc[P], control *CacheControl, ) *ParametricFetcher[P, T]
NewParametricFetcher creates a fetcher with automatic key generation from parameters The keyFunc generates cache keys from parameters automatically If keyFunc is nil, uses fmt.Sprintf("%v", params) as default
func (*ParametricFetcher[P, T]) ClearCache ΒΆ
func (pf *ParametricFetcher[P, T]) ClearCache()
ClearCache clears all cached entries
func (*ParametricFetcher[P, T]) Fetch ΒΆ
func (pf *ParametricFetcher[P, T]) Fetch(ctx context.Context, params P) (T, error)
Fetch retrieves data with automatic key generation from parameters The cache key is automatically generated using the keyFunc
func (*ParametricFetcher[P, T]) FetchStaleWhileRevalidate ΒΆ
func (pf *ParametricFetcher[P, T]) FetchStaleWhileRevalidate(ctx context.Context, params P) (T, error)
FetchStaleWhileRevalidate returns stale data immediately while revalidating in background
func (*ParametricFetcher[P, T]) SetCacheControl ΒΆ
func (pf *ParametricFetcher[P, T]) SetCacheControl(control *CacheControl)
SetCacheControl updates the cache control directives
type Pipeline ΒΆ
type Pipeline[T any] struct { // contains filtered or unexported fields }
Pipeline represents a stage in a processing pipeline
func (*Pipeline[T]) Execute ΒΆ
func (p *Pipeline[T]) Execute(item T) T
Execute runs the pipeline on a single item
func (*Pipeline[T]) ExecuteAsync ΒΆ
ExecuteAsync processes items through the pipeline concurrently
type PreflightFetcher ΒΆ
type PreflightFetcher[T any] struct { // contains filtered or unexported fields }
PreflightFetcher represents a data source with preflight cache check
func NewPreflightFetcher ΒΆ
func NewPreflightFetcher[T any]( fetchFunc func(ctx context.Context, key string) (T, error), control *CacheControl, ) *PreflightFetcher[T]
NewPreflightFetcher creates a new preflight fetcher with caching
func (*PreflightFetcher[T]) ClearCache ΒΆ
func (pf *PreflightFetcher[T]) ClearCache()
ClearCache clears all cached entries
func (*PreflightFetcher[T]) Fetch ΒΆ
func (pf *PreflightFetcher[T]) Fetch(ctx context.Context, key string) (T, error)
Fetch retrieves data with preflight cache check This implements the cache-first pattern to reduce downstream load
func (*PreflightFetcher[T]) FetchStaleWhileRevalidate ΒΆ
func (pf *PreflightFetcher[T]) FetchStaleWhileRevalidate(ctx context.Context, key string) (T, error)
FetchStaleWhileRevalidate returns stale data immediately while revalidating in background
func (*PreflightFetcher[T]) SetCacheControl ΒΆ
func (pf *PreflightFetcher[T]) SetCacheControl(control *CacheControl)
SetCacheControl updates the cache control directives
type RateLimiter ΒΆ
type RateLimiter struct {
// contains filtered or unexported fields
}
RateLimiter controls the rate of operations
func NewRateLimiter ΒΆ
func NewRateLimiter(rate int) *RateLimiter
NewRateLimiter creates a rate limiter with specified rate (operations per second)
func (*RateLimiter) TryAcquire ΒΆ
func (rl *RateLimiter) TryAcquire() bool
TryAcquire attempts to acquire a token without blocking
type RecastOptions ΒΆ
type RecastOptions struct {
// TransformFuncs maps field names to transformation functions
TransformFuncs map[string]TransformFunc
// OmitFields lists fields to exclude from output (supports negation)
OmitFields []string
// RenameFields maps source field names to destination field names
RenameFields map[string]string
}
RecastOptions holds configuration for recasting operations
type RetryPolicy ΒΆ
type RetryPolicy struct {
MaxRetries int
RetryDelay time.Duration
BackoffFactor float64
MaxRetryDelay time.Duration
}
RetryPolicy defines how tasks should be retried on failure
func DefaultRetryPolicy ΒΆ
func DefaultRetryPolicy() *RetryPolicy
DefaultRetryPolicy returns a default retry policy
type RolloutConfig ΒΆ
type RolloutConfig struct {
// Policy is the rollout strategy to use
Policy RolloutPolicy `json:"policy"`
// Percentage is the percentage of users to enable (0-100) for gradual rollout
Percentage int `json:"percentage,omitempty"`
// TargetUserIDs is the list of specific user IDs for targeted rollout
TargetUserIDs []string `json:"target_user_ids,omitempty"`
// CanarySegments is the list of segments for canary rollout (e.g., "beta_users", "internal")
CanarySegments []string `json:"canary_segments,omitempty"`
}
RolloutConfig contains rollout policy configuration
type RolloutPolicy ΒΆ
type RolloutPolicy string
RolloutPolicy defines how a feature is rolled out
const ( // RolloutAllAtOnce enables the feature for all users immediately RolloutAllAtOnce RolloutPolicy = "all_at_once" // RolloutGradual enables the feature gradually using percentage RolloutGradual RolloutPolicy = "gradual" // RolloutCanary enables the feature for specific user segments first RolloutCanary RolloutPolicy = "canary" // RolloutTargeted enables the feature for specific user IDs only RolloutTargeted RolloutPolicy = "targeted" )
type SafeChannel ΒΆ
type SafeChannel[T any] struct { // contains filtered or unexported fields }
SafeChannel provides a thread-safe wrapper around a channel with timeout capabilities
func NewSafeChannel ΒΆ
func NewSafeChannel[T any](bufferSize int, defaultTimeout time.Duration) *SafeChannel[T]
NewSafeChannel creates a new SafeChannel with the specified buffer size and default timeout
func (*SafeChannel[T]) Cap ΒΆ
func (sc *SafeChannel[T]) Cap() int
Cap returns the capacity of the channel
func (*SafeChannel[T]) IsClosed ΒΆ
func (sc *SafeChannel[T]) IsClosed() bool
IsClosed checks if the channel is closed
func (*SafeChannel[T]) Len ΒΆ
func (sc *SafeChannel[T]) Len() int
Len returns the current number of items in the channel
func (*SafeChannel[T]) Receive ΒΆ
func (sc *SafeChannel[T]) Receive(ctx context.Context) (T, error)
Receive attempts to receive a value from the channel with timeout
func (*SafeChannel[T]) Send ΒΆ
func (sc *SafeChannel[T]) Send(ctx context.Context, value T) error
Send attempts to send a value to the channel with timeout
func (*SafeChannel[T]) TryReceive ΒΆ
func (sc *SafeChannel[T]) TryReceive() (T, error)
TryReceive attempts to receive a value without blocking
func (*SafeChannel[T]) TrySend ΒΆ
func (sc *SafeChannel[T]) TrySend(value T) error
TrySend attempts to send a value without blocking
type Semaphore ΒΆ
type Semaphore struct {
// contains filtered or unexported fields
}
Semaphore controls concurrent access to a resource
func NewSemaphore ΒΆ
NewSemaphore creates a semaphore with specified number of permits
func (*Semaphore) AcquireAll ΒΆ
AcquireAll acquires all permits
func (*Semaphore) ReleaseAll ΒΆ
func (s *Semaphore) ReleaseAll()
ReleaseAll releases all acquired permits
func (*Semaphore) TryAcquire ΒΆ
TryAcquire attempts to acquire a permit without blocking
type Stream ΒΆ
type Stream[T any] struct { // contains filtered or unexported fields }
Stream represents a sequence of values with chainable operations. We need to add U to the type parameter list of Stream itself
func Map ΒΆ
Map converts a Stream[T] to a Stream[U] Instead of being a method with a type parameter, this is now a function
func (*Stream[T]) Collect ΒΆ
func (s *Stream[T]) Collect() []T
Collect accumulates all elements into a slice
type SuperSlice ΒΆ
type SuperSlice[T any] struct { // contains filtered or unexported fields }
SuperSlice provides efficient slice processing with configurable parallelization
func NewSuperSlice ΒΆ
func NewSuperSlice[T any](data []T) *SuperSlice[T]
NewSuperSlice creates a new SuperSlice with default configuration
func NewSuperSliceWithConfig ΒΆ
func NewSuperSliceWithConfig[T any](data []T, config *SuperSliceConfig) *SuperSlice[T]
NewSuperSliceWithConfig creates a new SuperSlice with custom configuration
func (*SuperSlice[T]) FilterSlice ΒΆ
func (ss *SuperSlice[T]) FilterSlice(predicate func(index int, item T) bool) []T
Filter returns a new slice with elements that satisfy the predicate
func (*SuperSlice[T]) ForEach ΒΆ
func (ss *SuperSlice[T]) ForEach(callback func(index int, item T))
ForEach applies a callback function to each element without returning results
func (*SuperSlice[T]) GetData ΒΆ
func (ss *SuperSlice[T]) GetData() []T
GetData returns the underlying slice
func (*SuperSlice[T]) Process ΒΆ
func (ss *SuperSlice[T]) Process(callback func(index int, item T) T) []T
Process applies a callback function to each element Returns a new slice with the results (or modifies in place if configured)
func (*SuperSlice[T]) ProcessWithError ΒΆ
func (ss *SuperSlice[T]) ProcessWithError(callback func(index int, item T) (T, error)) ([]T, error)
ProcessWithError applies a callback that can return an error
func (*SuperSlice[T]) WithConfig ΒΆ
func (ss *SuperSlice[T]) WithConfig(config *SuperSliceConfig) *SuperSlice[T]
WithConfig sets a custom configuration
func (*SuperSlice[T]) WithInPlace ΒΆ
func (ss *SuperSlice[T]) WithInPlace() *SuperSlice[T]
WithInPlace enables in-place updates
func (*SuperSlice[T]) WithIterable ΒΆ
func (ss *SuperSlice[T]) WithIterable() *SuperSlice[T]
WithIterable enables iterable processing
func (*SuperSlice[T]) WithThreshold ΒΆ
func (ss *SuperSlice[T]) WithThreshold(threshold int) *SuperSlice[T]
WithThreshold sets a custom threshold
func (*SuperSlice[T]) WithWorkers ΒΆ
func (ss *SuperSlice[T]) WithWorkers(numWorkers int) *SuperSlice[T]
WithWorkers sets the number of workers
type SuperSliceConfig ΒΆ
type SuperSliceConfig struct {
// Threshold determines when to switch to parallel processing (default: 1000)
Threshold int
// NumWorkers specifies the number of worker goroutines (default: NumCPU)
NumWorkers int
// UseIterable indicates whether to convert slice to iterable first (default: false)
UseIterable bool
// InPlace indicates whether to update the slice in place (default: false)
InPlace bool
}
SuperSliceConfig holds configuration for SuperSlice processing
func DefaultSuperSliceConfig ΒΆ
func DefaultSuperSliceConfig() *SuperSliceConfig
DefaultSuperSliceConfig returns a configuration with sensible defaults
type SwissMap ΒΆ
type SwissMap[K comparable, V any] struct { // contains filtered or unexported fields }
SwissMap is a high-performance, thread-safe generic map using sharded architecture for optimal concurrent access. It divides the map into multiple shards, each protected by its own mutex, reducing lock contention in high-concurrency scenarios.
func NewSwissMap ΒΆ
func NewSwissMap[K comparable, V any]() *SwissMap[K, V]
NewSwissMap creates a new SwissMap with default shard count (32) This provides excellent performance for most concurrent workloads
func NewSwissMapWithShards ΒΆ
func NewSwissMapWithShards[K comparable, V any](shardCount uint32) *SwissMap[K, V]
NewSwissMapWithShards creates a new SwissMap with specified shard count The shard count must be a power of 2 for optimal performance Higher shard counts reduce lock contention but increase memory overhead
func (*SwissMap[K, V]) Clear ΒΆ
func (sm *SwissMap[K, V]) Clear()
Clear removes all entries from the map
func (*SwissMap[K, V]) Delete ΒΆ
func (sm *SwissMap[K, V]) Delete(key K)
Delete removes a key from the map
func (*SwissMap[K, V]) Get ΒΆ
Get retrieves a value from the map Returns the value and true if found, zero value and false otherwise
func (*SwissMap[K, V]) GetOrCompute ΒΆ
func (sm *SwissMap[K, V]) GetOrCompute(key K, compute func() V) V
GetOrCompute retrieves a value or computes and sets it if not present The compute function is only called if the key doesn't exist
func (*SwissMap[K, V]) GetOrSet ΒΆ
GetOrSet retrieves a value or sets it if not present Returns the value (existing or newly set) and true if it was already present
func (*SwissMap[K, V]) Keys ΒΆ
func (sm *SwissMap[K, V]) Keys() []K
Keys returns all keys in the map
func (*SwissMap[K, V]) Range ΒΆ
Range iterates over all key-value pairs in the map The function f is called for each entry. If f returns false, iteration stops
func (*SwissMap[K, V]) Set ΒΆ
func (sm *SwissMap[K, V]) Set(key K, value V)
Set stores a key-value pair in the map
type TaskHandler ΒΆ
TaskHandler is a registered function that can be retrieved by name
func GetTaskHandler ΒΆ
func GetTaskHandler(name string) (TaskHandler, error)
GetTaskHandler retrieves a registered task handler by name
type TaskPriority ΒΆ
type TaskPriority int
TaskPriority defines task priority levels
const ( PriorityLow TaskPriority = iota PriorityNormal PriorityHigh PriorityCritical )
type TaskResult ΒΆ
type TaskResult struct {
TaskID string
Status TaskStatus
Result interface{}
Error error
StartTime time.Time
EndTime time.Time
Attempts int
}
TaskResult stores the result of task execution
type TaskStatus ΒΆ
type TaskStatus string
TaskStatus represents the execution status of a task
const ( StatusPending TaskStatus = "pending" StatusRunning TaskStatus = "running" StatusSuccess TaskStatus = "success" StatusFailed TaskStatus = "failed" StatusRetrying TaskStatus = "retrying" StatusExpired TaskStatus = "expired" )
type TransformFunc ΒΆ
type TransformFunc func(interface{}) interface{}
TransformFunc is a function type for custom field transformations
type WorkerPool ΒΆ
type WorkerPool struct {
// contains filtered or unexported fields
}
WorkerPool manages a pool of workers that execute tasks
func NewWorkerPool ΒΆ
func NewWorkerPool(numWorkers int) *WorkerPool
NewWorkerPool creates a new worker pool with the specified number of workers
func NewWorkerPoolWithConfig ΒΆ
func NewWorkerPoolWithConfig(config *WorkerPoolConfig) *WorkerPool
NewWorkerPoolWithConfig creates a new worker pool with custom configuration
func (*WorkerPool) AckTask ΒΆ
func (wp *WorkerPool) AckTask(taskID string)
AckTask acknowledges task completion (Celery mode)
func (*WorkerPool) CancelCron ΒΆ
func (wp *WorkerPool) CancelCron(taskID string)
CancelCron cancels a recurring cron task
func (*WorkerPool) ClearResults ΒΆ
func (wp *WorkerPool) ClearResults()
ClearResults clears all stored task results
func (*WorkerPool) GetDeadLetterTasks ΒΆ
func (wp *WorkerPool) GetDeadLetterTasks() []*WorkerTask
GetDeadLetterTasks returns tasks that failed after max retries (Resque mode)
func (*WorkerPool) GetQueueLength ΒΆ
func (wp *WorkerPool) GetQueueLength(queueName string) int
GetQueueLength returns the number of pending tasks in a queue (Celery mode)
func (*WorkerPool) GetResult ΒΆ
func (wp *WorkerPool) GetResult(taskID string) (*TaskResult, bool)
GetResult retrieves the result of a task (both modes)
func (*WorkerPool) IsRunning ΒΆ
func (wp *WorkerPool) IsRunning() bool
IsRunning returns whether the worker pool is running
func (*WorkerPool) RejectTask ΒΆ
func (wp *WorkerPool) RejectTask(taskID string, requeue bool) error
RejectTask rejects a task and optionally requeues it (Celery mode)
func (*WorkerPool) RequeueDeadLetter ΒΆ
func (wp *WorkerPool) RequeueDeadLetter(taskID string) error
RequeueDeadLetter requeues a task from dead letter queue (Resque mode)
func (*WorkerPool) Stop ΒΆ
func (wp *WorkerPool) Stop()
Stop stops the worker pool and waits for all workers to finish
func (*WorkerPool) Submit ΒΆ
func (wp *WorkerPool) Submit(task *WorkerTask) error
Submit submits a task for immediate execution
func (*WorkerPool) SubmitCron ΒΆ
func (wp *WorkerPool) SubmitCron(task *WorkerTask, cronExpr string) error
SubmitCron submits a recurring task with a cron schedule
func (*WorkerPool) SubmitDelayed ΒΆ
func (wp *WorkerPool) SubmitDelayed(task *WorkerTask, delay time.Duration) error
SubmitDelayed submits a task for delayed execution
func (*WorkerPool) SubmitToQueue ΒΆ
func (wp *WorkerPool) SubmitToQueue(queueName string, task *WorkerTask) error
SubmitToQueue submits a task to a named queue (Celery mode)
func (*WorkerPool) WorkerCount ΒΆ
func (wp *WorkerPool) WorkerCount() int
WorkerCount returns the number of workers in the pool
type WorkerPoolConfig ΒΆ
type WorkerPoolConfig struct {
NumWorkers int
MaxDeadLetter int
EnableQueues bool // Enable named queues (Celery mode)
EnableResults bool // Enable result storage (both modes)
OnTaskStart func(task *WorkerTask)
OnTaskComplete func(task *WorkerTask, result *TaskResult)
OnTaskFailed func(task *WorkerTask, err error)
}
WorkerPoolConfig configures the worker pool
func DefaultWorkerPoolConfig ΒΆ
func DefaultWorkerPoolConfig() *WorkerPoolConfig
DefaultWorkerPoolConfig returns default configuration
type WorkerTask ΒΆ
type WorkerTask struct {
ID string
Func TaskFunc
Delay time.Duration
CronExpr string
IsRecurring bool
// Resque/Celery compatibility fields
Priority TaskPriority
Queue string
RetryPolicy *RetryPolicy
Timeout time.Duration
ExpiresAt *time.Time
Args map[string]interface{}
// Broker serialization fields
HandlerName string // Name of registered handler for broker storage
// contains filtered or unexported fields
}
WorkerTask represents a unit of work with optional delay and cron schedule
func DecodeFromBroker ΒΆ
func DecodeFromBroker(data []byte) (*WorkerTask, error)
DecodeFromBroker decodes a task from broker JSON format
func (*WorkerTask) EncodeToBroker ΒΆ
func (wt *WorkerTask) EncodeToBroker() ([]byte, error)
EncodeToBroker encodes the task to broker-compatible JSON format
func (*WorkerTask) GetAttempts ΒΆ
func (wt *WorkerTask) GetAttempts() int
GetAttempts returns the number of execution attempts
func (*WorkerTask) GetStatus ΒΆ
func (wt *WorkerTask) GetStatus() TaskStatus
GetStatus returns the current task status
func (*WorkerTask) MarshalJSON ΒΆ
func (wt *WorkerTask) MarshalJSON() ([]byte, error)
MarshalJSON serializes task metadata (Celery compatibility)
Source Files
ΒΆ
Directories
ΒΆ
| Path | Synopsis |
|---|---|
|
featureflag_demo
command
|
|
|
swissmap_demo
command
|
|
|
worker_broker
command
|
|
|
worker_demo
command
|
|
|
worker_resque_celery
command
|