Documentation
¶
Index ¶
- func AddTask(task ITask, logger *zerolog.Logger)
- func DelTask(taskID string, interruptFn func(task ITask, server string) error, ...) string
- func InitRedisTaskQueueManager(redisAddr string, password string, dbIndex int, runnerID string, ...) error
- func InitRedisTaskQueueManagerReader(redisAddr string, password string, dbIndex int, runnerID string, ...) error
- func InitTaskQueueManager(dbConnString string, runnerID string, providers []ProviderConfig, ...) error
- func InitTaskQueueManagerReader(dbConnString string, runnerID string, logger *zerolog.Logger, ...) error
- func RedisAddTask(task ITask, logger *zerolog.Logger)
- func RedisDelTask(taskID string, interruptFn func(task ITask, server string) error, ...) string
- func RedisRequeueTaskIfNeeded(logger *zerolog.Logger, tasks []ITask)
- func RequeueTaskIfNeeded(logger *zerolog.Logger, tasks []ITask)
- type CommandProvider
- type CommandTask
- func (ct *CommandTask) GetCallbackName() string
- func (ct *CommandTask) GetCreatedAt() time.Time
- func (ct *CommandTask) GetID() string
- func (ct *CommandTask) GetMaxRetries() int
- func (ct *CommandTask) GetPriority() int
- func (ct *CommandTask) GetProvider() IProvider
- func (ct *CommandTask) GetRetries() int
- func (ct *CommandTask) GetTaskGroup() ITaskGroup
- func (ct *CommandTask) GetTimeout() time.Duration
- func (ct *CommandTask) MarkAsFailed(t int64, err error)
- func (ct *CommandTask) MarkAsSuccess(t int64)
- func (ct *CommandTask) OnComplete()
- func (ct *CommandTask) OnStart()
- func (ct *CommandTask) UpdateLastError(error string) error
- func (ct *CommandTask) UpdateRetries(retries int) error
- type ConfigCache
- type IProvider
- type IScheduledCallback
- type ITask
- type ITaskGroup
- type ProviderConfig
- type RedisScheduledTaskScheduler
- func (s *RedisScheduledTaskScheduler) CancelScheduled(taskID string) error
- func (s *RedisScheduledTaskScheduler) DeleteScheduled(taskID string) error
- func (s *RedisScheduledTaskScheduler) RegisterCallback(callback IScheduledCallback)
- func (s *RedisScheduledTaskScheduler) RunEvery(interval time.Duration, callbackName string, payload map[string]interface{}) (string, error)
- func (s *RedisScheduledTaskScheduler) RunIn(delay time.Duration, callbackName string, payload map[string]interface{}) (string, error)
- func (s *RedisScheduledTaskScheduler) RunInWithRetries(delay time.Duration, callbackName string, payload map[string]interface{}, ...) (string, error)
- func (s *RedisScheduledTaskScheduler) Shutdown()
- func (s *RedisScheduledTaskScheduler) Start()
- type RedisTaskManager
- func (tm *RedisTaskManager) AddTask(task ITask) bool
- func (tm *RedisTaskManager) AddTasks(tasks []ITask) (count int, err error)
- func (tm *RedisTaskManager) DelTask(taskID string, interruptFn func(task ITask, server string) error) string
- func (tm *RedisTaskManager) SetServerMaxParallel(serverPrefix string, maxParallel int) error
- func (tm *RedisTaskManager) Shutdown()
- func (tm *RedisTaskManager) Start()
- func (tm *RedisTaskManager) UpdateProviderServers(name string, servers []string) error
- type RunningTaskInfo
- type ScheduledTaskScheduler
- func (s *ScheduledTaskScheduler) CancelScheduled(taskID string) error
- func (s *ScheduledTaskScheduler) DeleteScheduled(taskID string) error
- func (s *ScheduledTaskScheduler) RegisterCallback(callback IScheduledCallback)
- func (s *ScheduledTaskScheduler) RunEvery(interval time.Duration, callbackName string, payload map[string]interface{}) (string, error)
- func (s *ScheduledTaskScheduler) RunIn(delay time.Duration, callbackName string, payload map[string]interface{}) (string, error)
- func (s *ScheduledTaskScheduler) RunInWithRetries(delay time.Duration, callbackName string, payload map[string]interface{}, ...) (string, error)
- func (s *ScheduledTaskScheduler) Shutdown()
- func (s *ScheduledTaskScheduler) Start()
- type StupidTaskManager
- func (tm *StupidTaskManager) AddTask(task ITask) bool
- func (tm *StupidTaskManager) AddTasks(tasks []ITask) (count int, err error)
- func (tm *StupidTaskManager) DelTask(taskID string, interruptFn func(task ITask, server string) error) string
- func (tm *StupidTaskManager) SetServerMaxParallel(serverPrefix string, maxParallel int) error
- func (tm *StupidTaskManager) Shutdown()
- func (tm *StupidTaskManager) Start()
- func (tm *StupidTaskManager) UpdateProviderServers(name string, servers []string) error
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func AddTask ¶
AddTask is a global helper for adding tasks to the singleton This is for backward compatibility with SMT v1 code. New code should call tm.AddTask() directly on the instance.
func DelTask ¶
func DelTask(taskID string, interruptFn func(task ITask, server string) error, logger *zerolog.Logger) string
DelTask is a global helper for deleting/cancelling tasks from the singleton This is for backward compatibility with SMT v1 code. New code should call tm.DelTask() directly on the instance.
func InitRedisTaskQueueManager ¶ added in v0.0.9
func InitRedisTaskQueueManager( redisAddr string, password string, dbIndex int, runnerID string, providers []ProviderConfig, logger *zerolog.Logger, getTimeout func(string, string) time.Duration, ) error
InitRedisTaskQueueManager creates the global singleton instance (leader mode) This is for backward compatibility with SMT v1 code. Caller must call RedisTaskQueueManagerInstance.Start() explicitly after init. New code should use NewRedisTaskManager + Start() directly.
func InitRedisTaskQueueManagerReader ¶ added in v0.0.9
func InitRedisTaskQueueManagerReader( redisAddr string, password string, dbIndex int, runnerID string, logger *zerolog.Logger, getTimeout func(string, string) time.Duration, ) error
InitRedisTaskQueueManagerReader creates the global singleton instance (reader mode) This is for backward compatibility with SMT v1 code. Reader connects to existing config initialized by leader instance. Caller must call RedisTaskQueueManagerInstance.Start() explicitly after init. New code should use NewRedisTaskManagerReader + Start() directly.
func InitTaskQueueManager ¶
func InitTaskQueueManager( dbConnString string, runnerID string, providers []ProviderConfig, logger *zerolog.Logger, getTimeout func(string, string) time.Duration, ) error
InitTaskQueueManager creates the global singleton instance (leader mode) This is for backward compatibility with SMT v1 code. Caller must call TaskQueueManagerInstance.Start() explicitly after init. New code should use NewStupidTaskManager + Start() directly.
func InitTaskQueueManagerReader ¶
func InitTaskQueueManagerReader( dbConnString string, runnerID string, logger *zerolog.Logger, getTimeout func(string, string) time.Duration, ) error
InitTaskQueueManagerReader creates the global singleton instance (reader mode) This is for backward compatibility with SMT v1 code. Reader connects to existing schema initialized by leader instance. Caller must call TaskQueueManagerInstance.Start() explicitly after init. New code should use NewStupidTaskManagerReader + Start() directly.
func RedisAddTask ¶ added in v0.0.9
RedisAddTask is a global helper for adding tasks to the Redis singleton This is for backward compatibility with SMT v1 code. New code should call tm.AddTask() directly on the instance.
func RedisDelTask ¶ added in v0.0.9
func RedisDelTask(taskID string, interruptFn func(task ITask, server string) error, logger *zerolog.Logger) string
RedisDelTask is a global helper for deleting/cancelling tasks from the Redis singleton This is for backward compatibility with SMT v1 code. New code should call tm.DelTask() directly on the instance.
func RedisRequeueTaskIfNeeded ¶ added in v0.0.9
RedisRequeueTaskIfNeeded re-adds tasks to the global Redis singleton This is for backward compatibility with SMT v1 code.
func RequeueTaskIfNeeded ¶
RequeueTaskIfNeeded re-adds tasks to the global singleton This is for backward compatibility with SMT v1 code.
Types ¶
type CommandProvider ¶
type CommandProvider struct {
// contains filtered or unexported fields
}
CommandProvider wraps command execution to implement IProvider interface
func NewCommandProvider ¶
func NewCommandProvider(name string) *CommandProvider
NewCommandProvider creates a new CommandProvider
type CommandTask ¶
type CommandTask struct {
// contains filtered or unexported fields
}
CommandTask wraps a command function to implement ITask interface
func NewCommandTask ¶
func NewCommandTask(providerName string, provider IProvider, commandFunc func(server string) error, priority int) *CommandTask
NewCommandTask creates a new CommandTask
func (*CommandTask) GetCallbackName ¶
func (ct *CommandTask) GetCallbackName() string
GetCallbackName implements ITask
func (*CommandTask) GetCreatedAt ¶
func (ct *CommandTask) GetCreatedAt() time.Time
GetCreatedAt implements ITask
func (*CommandTask) GetMaxRetries ¶
func (ct *CommandTask) GetMaxRetries() int
GetMaxRetries implements ITask
func (*CommandTask) GetPriority ¶
func (ct *CommandTask) GetPriority() int
GetPriority implements ITask
func (*CommandTask) GetProvider ¶
func (ct *CommandTask) GetProvider() IProvider
GetProvider implements ITask
func (*CommandTask) GetRetries ¶
func (ct *CommandTask) GetRetries() int
GetRetries implements ITask
func (*CommandTask) GetTaskGroup ¶
func (ct *CommandTask) GetTaskGroup() ITaskGroup
GetTaskGroup implements ITask
func (*CommandTask) GetTimeout ¶
func (ct *CommandTask) GetTimeout() time.Duration
GetTimeout implements ITask
func (*CommandTask) MarkAsFailed ¶
func (ct *CommandTask) MarkAsFailed(t int64, err error)
MarkAsFailed implements ITask
func (*CommandTask) MarkAsSuccess ¶
func (ct *CommandTask) MarkAsSuccess(t int64)
MarkAsSuccess implements ITask
func (*CommandTask) UpdateLastError ¶
func (ct *CommandTask) UpdateLastError(error string) error
UpdateLastError implements ITask
func (*CommandTask) UpdateRetries ¶
func (ct *CommandTask) UpdateRetries(retries int) error
UpdateRetries implements ITask
type ConfigCache ¶
type ConfigCache struct {
// contains filtered or unexported fields
}
ConfigCache holds provider/server/limit configuration in memory
type IScheduledCallback ¶
type IScheduledCallback interface {
Name() string // Unique identifier for callback registration
Handle(payload map[string]interface{}) error // Execute the scheduled task
OnComplete(payload map[string]interface{}) // Called after successful execution
OnFail(payload map[string]interface{}, err error) // Called after max retries exhausted
}
IScheduledCallback is the interface for scheduled task handlers
type ITask ¶
type ITask interface {
MarkAsSuccess(t int64)
MarkAsFailed(t int64, err error)
GetPriority() int
GetID() string
GetMaxRetries() int
GetRetries() int
GetCreatedAt() time.Time
GetTaskGroup() ITaskGroup
GetProvider() IProvider
UpdateRetries(int) error
GetTimeout() time.Duration
UpdateLastError(string) error
GetCallbackName() string
OnComplete()
OnStart()
}
Task interfaces (task.go)
type ITaskGroup ¶
type ProviderConfig ¶
ProviderConfig for leader initialization
type RedisScheduledTaskScheduler ¶ added in v0.0.8
type RedisScheduledTaskScheduler struct {
// contains filtered or unexported fields
}
RedisScheduledTaskScheduler manages delayed and recurring task execution using Redis
func NewRedisScheduledTaskScheduler ¶ added in v0.0.8
func NewRedisScheduledTaskScheduler(rdb *redis.Client, logger *zerolog.Logger) *RedisScheduledTaskScheduler
NewRedisScheduledTaskScheduler creates a scheduler instance
func (*RedisScheduledTaskScheduler) CancelScheduled ¶ added in v0.0.8
func (s *RedisScheduledTaskScheduler) CancelScheduled(taskID string) error
CancelScheduled cancels a scheduled task
func (*RedisScheduledTaskScheduler) DeleteScheduled ¶ added in v0.0.8
func (s *RedisScheduledTaskScheduler) DeleteScheduled(taskID string) error
DeleteScheduled permanently deletes a scheduled task
func (*RedisScheduledTaskScheduler) RegisterCallback ¶ added in v0.0.8
func (s *RedisScheduledTaskScheduler) RegisterCallback(callback IScheduledCallback)
RegisterCallback registers a scheduled task callback
func (*RedisScheduledTaskScheduler) RunEvery ¶ added in v0.0.8
func (s *RedisScheduledTaskScheduler) RunEvery(interval time.Duration, callbackName string, payload map[string]interface{}) (string, error)
RunEvery schedules a recurring task to run at intervals
func (*RedisScheduledTaskScheduler) RunIn ¶ added in v0.0.8
func (s *RedisScheduledTaskScheduler) RunIn(delay time.Duration, callbackName string, payload map[string]interface{}) (string, error)
RunIn schedules a one-shot task to run after a delay
func (*RedisScheduledTaskScheduler) RunInWithRetries ¶ added in v0.0.8
func (s *RedisScheduledTaskScheduler) RunInWithRetries(delay time.Duration, callbackName string, payload map[string]interface{}, maxRetries int) (string, error)
RunInWithRetries schedules a one-shot task with custom retry limit
func (*RedisScheduledTaskScheduler) Shutdown ¶ added in v0.0.8
func (s *RedisScheduledTaskScheduler) Shutdown()
Shutdown stops the scheduler
func (*RedisScheduledTaskScheduler) Start ¶ added in v0.0.8
func (s *RedisScheduledTaskScheduler) Start()
Start begins the scheduler polling loop
type RedisTaskManager ¶ added in v0.0.8
type RedisTaskManager struct {
// contains filtered or unexported fields
}
RedisTaskManager - STM with Redis backend (simpler + faster than PostgreSQL)
var ( // RedisTaskQueueManagerInstance is the global singleton instance // This exists for backward compatibility with SMT v1 code. // New code should create instances directly via NewRedisTaskManager. RedisTaskQueueManagerInstance *RedisTaskManager )
func NewRedisTaskManager ¶ added in v0.0.8
func NewRedisTaskManager( redisAddr string, password string, dbIndex int, runnerID string, providers []ProviderConfig, logger *zerolog.Logger, getTimeout func(string, string) time.Duration, ) (*RedisTaskManager, error)
NewRedisTaskManager creates Redis-backed STM (leader mode) Leader initializes config keys in Redis redisAddr: "localhost:6379" password: Redis password (empty string for no auth) dbIndex: Redis database index (0-15, use different index per environment)
func NewRedisTaskManagerReader ¶ added in v0.0.8
func NewRedisTaskManagerReader( redisAddr string, password string, dbIndex int, runnerID string, logger *zerolog.Logger, getTimeout func(string, string) time.Duration, ) (*RedisTaskManager, error)
NewRedisTaskManagerReader creates Redis-backed STM (reader mode) Reader connects to existing config
func (*RedisTaskManager) AddTask ¶ added in v0.0.8
func (tm *RedisTaskManager) AddTask(task ITask) bool
AddTask enqueues a task
func (*RedisTaskManager) AddTasks ¶ added in v0.0.8
func (tm *RedisTaskManager) AddTasks(tasks []ITask) (count int, err error)
AddTasks enqueues multiple tasks
func (*RedisTaskManager) DelTask ¶ added in v0.0.8
func (tm *RedisTaskManager) DelTask(taskID string, interruptFn func(task ITask, server string) error) string
DelTask removes a task from queue or cancels if running
func (*RedisTaskManager) SetServerMaxParallel ¶ added in v0.0.8
func (tm *RedisTaskManager) SetServerMaxParallel(serverPrefix string, maxParallel int) error
SetServerMaxParallel updates server concurrency limit
func (*RedisTaskManager) Shutdown ¶ added in v0.0.8
func (tm *RedisTaskManager) Shutdown()
Shutdown gracefully stops the task manager
func (*RedisTaskManager) Start ¶ added in v0.0.8
func (tm *RedisTaskManager) Start()
Start begins worker goroutines
func (*RedisTaskManager) UpdateProviderServers ¶ added in v0.0.8
func (tm *RedisTaskManager) UpdateProviderServers(name string, servers []string) error
UpdateProviderServers updates provider's server list
type RunningTaskInfo ¶
type RunningTaskInfo struct {
// contains filtered or unexported fields
}
RunningTaskInfo holds information about a currently executing task (copied from SMT v1 for compatibility)
type ScheduledTaskScheduler ¶
type ScheduledTaskScheduler struct {
// contains filtered or unexported fields
}
ScheduledTaskScheduler manages delayed and recurring task execution
func NewScheduledTaskScheduler ¶
func NewScheduledTaskScheduler(db *pgxpool.Pool, logger *zerolog.Logger) *ScheduledTaskScheduler
NewScheduledTaskScheduler creates a scheduler instance
func (*ScheduledTaskScheduler) CancelScheduled ¶
func (s *ScheduledTaskScheduler) CancelScheduled(taskID string) error
CancelScheduled cancels a scheduled task
func (*ScheduledTaskScheduler) DeleteScheduled ¶
func (s *ScheduledTaskScheduler) DeleteScheduled(taskID string) error
DeleteScheduled permanently deletes a scheduled task from database
func (*ScheduledTaskScheduler) RegisterCallback ¶
func (s *ScheduledTaskScheduler) RegisterCallback(callback IScheduledCallback)
RegisterCallback registers a scheduled task callback
func (*ScheduledTaskScheduler) RunEvery ¶
func (s *ScheduledTaskScheduler) RunEvery(interval time.Duration, callbackName string, payload map[string]interface{}) (string, error)
RunEvery schedules a recurring task to run at intervals
func (*ScheduledTaskScheduler) RunIn ¶
func (s *ScheduledTaskScheduler) RunIn(delay time.Duration, callbackName string, payload map[string]interface{}) (string, error)
RunIn schedules a one-shot task to run after a delay
func (*ScheduledTaskScheduler) RunInWithRetries ¶
func (s *ScheduledTaskScheduler) RunInWithRetries(delay time.Duration, callbackName string, payload map[string]interface{}, maxRetries int) (string, error)
RunInWithRetries schedules a one-shot task with custom retry limit
func (*ScheduledTaskScheduler) Shutdown ¶
func (s *ScheduledTaskScheduler) Shutdown()
Shutdown stops the scheduler
func (*ScheduledTaskScheduler) Start ¶
func (s *ScheduledTaskScheduler) Start()
Start begins the scheduler polling loop
type StupidTaskManager ¶
type StupidTaskManager struct {
// contains filtered or unexported fields
}
StupidTaskManager - STM v2 with PostgreSQL backend
var ( // TaskQueueManagerInstance is the global singleton instance // This exists for backward compatibility with SMT v1 code. // New code should create instances directly via NewStupidTaskManager. TaskQueueManagerInstance *StupidTaskManager )
func NewStupidTaskManager ¶
func NewStupidTaskManager( dbConnString string, runnerID string, providers []ProviderConfig, logger *zerolog.Logger, getTimeout func(string, string) time.Duration, maxOpenConns int, maxIdleConns int, ) (*StupidTaskManager, error)
NewStupidTaskManager creates STM v2 instance (leader mode) Leader is responsible for: - Creating database schema if not exists - Initializing provider configuration - Starting config watcher runnerID: Unique identifier for this instance (e.g. "soulkyn-stm", "pixi-stm") maxOpenConns: Maximum number of open connections to database (0 = unlimited, recommended: 25) maxIdleConns: Maximum number of idle connections (recommended: 10)
Uses pgx/v5 with simple protocol (no prepared statements) for pgbouncer transaction mode compatibility.
func NewStupidTaskManagerReader ¶
func NewStupidTaskManagerReader( dbConnString string, runnerID string, logger *zerolog.Logger, getTimeout func(string, string) time.Duration, maxOpenConns int, maxIdleConns int, ) (*StupidTaskManager, error)
NewStupidTaskManagerReader creates STM v2 instance (reader mode) Reader is responsible for: - Verifying schema exists (fail fast if missing) - Loading existing configuration from database - Starting config watcher runnerID: Unique identifier for this instance (e.g. "soulkyn-stm", "pixi-stm") maxOpenConns: Maximum number of open connections to database (0 = unlimited, recommended: 25) maxIdleConns: Maximum number of idle connections (recommended: 10)
Uses pgx/v5 with simple protocol (no prepared statements) for pgbouncer transaction mode compatibility.
func (*StupidTaskManager) AddTask ¶
func (tm *StupidTaskManager) AddTask(task ITask) bool
AddTask enqueues a task if not already known. Returns true if successfully enqueued.
func (*StupidTaskManager) AddTasks ¶
func (tm *StupidTaskManager) AddTasks(tasks []ITask) (count int, err error)
AddTasks enqueues multiple tasks. Returns count of successfully enqueued tasks.
func (*StupidTaskManager) DelTask ¶
func (tm *StupidTaskManager) DelTask(taskID string, interruptFn func(task ITask, server string) error) string
DelTask removes a task from queue or cancels it if running Returns: "removed_from_queue", "interrupted_running", "not_found", or "error: <msg>"
func (*StupidTaskManager) SetServerMaxParallel ¶
func (tm *StupidTaskManager) SetServerMaxParallel(serverPrefix string, maxParallel int) error
SetServerMaxParallel updates server concurrency limit
func (*StupidTaskManager) Shutdown ¶
func (tm *StupidTaskManager) Shutdown()
Shutdown gracefully stops the task manager
func (*StupidTaskManager) Start ¶
func (tm *StupidTaskManager) Start()
Start begins worker goroutines for task processing
func (*StupidTaskManager) UpdateProviderServers ¶
func (tm *StupidTaskManager) UpdateProviderServers(name string, servers []string) error
UpdateProviderServers updates provider's server list