Documentation
¶
Index ¶
- type AssignResult
- type ColoniesController
- func (controller *ColoniesController) AddChild(processGraphID string, parentProcessID string, childProcessID string, ...) (*core.Process, error)
- func (controller *ColoniesController) AddCron(cron *core.Cron) (*core.Cron, error)
- func (controller *ColoniesController) AddGenerator(generator *core.Generator) (*core.Generator, error)
- func (controller *ColoniesController) AddProcess(process *core.Process) (*core.Process, error)
- func (controller *ColoniesController) AddProcessToDB(process *core.Process) (*core.Process, error)
- func (controller *ColoniesController) AreColonyAssignmentsPaused(colonyName string) (bool, error)
- func (controller *ColoniesController) Assign(executorID string, colonyName string, cpu int64, mem int64) (*AssignResult, error)
- func (controller *ColoniesController) BlockingCmdQueueWorker()
- func (controller *ColoniesController) CalcNextRun(cron *core.Cron) time.Time
- func (controller *ColoniesController) CleanupWorker()
- func (controller *ColoniesController) CloseFailed(processID string, errs []string) error
- func (controller *ColoniesController) CloseSuccessful(processID string, executorID string, output []interface{}) error
- func (controller *ColoniesController) CmdQueueWorker()
- func (controller *ColoniesController) CreateProcessGraph(workflowSpec *core.WorkflowSpec, args []interface{}, ...) (*core.ProcessGraph, error)
- func (controller *ColoniesController) CreateResumeChannel(colonyName string) <-chan bool
- func (controller *ColoniesController) CronTriggerLoop()
- func (controller *ColoniesController) DistributedAssign(executor *core.Executor, colonyName string, cpu int64, memory int64, ...) (*AssignResult, error)
- func (controller *ColoniesController) FindFailedProcessGraphs(colonyName string, count int) ([]*core.ProcessGraph, error)
- func (controller *ColoniesController) FindRunningProcessGraphs(colonyName string, count int) ([]*core.ProcessGraph, error)
- func (controller *ColoniesController) FindSuccessfulProcessGraphs(colonyName string, count int) ([]*core.ProcessGraph, error)
- func (controller *ColoniesController) FindWaitingProcessGraphs(colonyName string, count int) ([]*core.ProcessGraph, error)
- func (controller *ColoniesController) GeneratorTriggerLoop()
- func (controller *ColoniesController) GetChannelRouter() *channel.Router
- func (controller *ColoniesController) GetCron(cronID string) (*core.Cron, error)
- func (controller *ColoniesController) GetCronByName(colonyName string, cronName string) (*core.Cron, error)
- func (controller *ColoniesController) GetCronPeriod() int
- func (controller *ColoniesController) GetCrons(colonyName string, count int) ([]*core.Cron, error)
- func (controller *ColoniesController) GetEtcdServer() *cluster.EtcdServer
- func (controller *ColoniesController) GetEventHandler() backends.RealtimeEventHandler
- func (controller *ColoniesController) GetGenerator(generatorID string) (*core.Generator, error)
- func (controller *ColoniesController) GetGeneratorPeriod() int
- func (controller *ColoniesController) GetGenerators(colonyName string, count int) ([]*core.Generator, error)
- func (controller *ColoniesController) GetProcessGraphByID(processGraphID string) (*core.ProcessGraph, error)
- func (controller *ColoniesController) GetProcessGraphStorage() *processGraphStorageAdapter
- func (controller *ColoniesController) GetThisNode() cluster.Node
- func (controller *ColoniesController) HandleDefunctProcessgraph(processGraphID string, processID string, err error) error
- func (controller *ColoniesController) IsLeader() bool
- func (controller *ColoniesController) NotifyChildren(process *core.Process) error
- func (controller *ColoniesController) PackGenerator(generatorID string, colonyName, arg string) error
- func (controller *ColoniesController) PauseColonyAssignments(colonyName string) error
- func (controller *ColoniesController) RemoveCron(cronID string) error
- func (controller *ColoniesController) RemoveGenerator(generatorID string) error
- func (controller *ColoniesController) ResetDatabase() error
- func (controller *ColoniesController) ResetProcess(processID string) error
- func (controller *ColoniesController) ResolveGenerator(colonyName string, generatorName string) (*core.Generator, error)
- func (controller *ColoniesController) ResumeColonyAssignments(colonyName string) error
- func (controller *ColoniesController) RetentionWorker()
- func (controller *ColoniesController) RunCron(cronID string) (*core.Cron, error)
- func (controller *ColoniesController) StartCron(cron *core.Cron)
- func (controller *ColoniesController) Stop()
- func (controller *ColoniesController) SubmitWorkflow(generator *core.Generator, counter int, recoveredID string)
- func (controller *ColoniesController) SubmitWorkflowSpec(workflowSpec *core.WorkflowSpec, recoveredID string) (*core.ProcessGraph, error)
- func (controller *ColoniesController) SubscribeProcess(executorID string, subscription *backends.RealtimeSubscription) error
- func (controller *ColoniesController) SubscribeProcesses(executorID string, subscription *backends.RealtimeSubscription) error
- func (controller *ColoniesController) TimeoutLoop()
- func (controller *ColoniesController) TriggerCrons()
- func (controller *ColoniesController) TriggerGenerators()
- func (controller *ColoniesController) TryBecomeLeader() bool
- func (controller *ColoniesController) UnassignExecutor(processID string) error
- func (controller *ColoniesController) UpdateProcessGraph(graph *core.ProcessGraph) error
- func (controller *ColoniesController) WakeupPausedAssignments(colonyName string)
- type Controller
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AssignResult ¶
type AssignResult struct {
Process *core.Process
IsPaused bool
ResumeChannel <-chan bool // Only set when IsPaused=true
}
AssignResult contains the result of a process assignment attempt
type ColoniesController ¶
type ColoniesController struct {
// contains filtered or unexported fields
}
func (*ColoniesController) AddGenerator ¶
func (*ColoniesController) AddProcess ¶
func (*ColoniesController) AddProcessToDB ¶
func (*ColoniesController) AreColonyAssignmentsPaused ¶
func (controller *ColoniesController) AreColonyAssignmentsPaused(colonyName string) (bool, error)
func (*ColoniesController) Assign ¶
func (controller *ColoniesController) Assign(executorID string, colonyName string, cpu int64, mem int64) (*AssignResult, error)
func (*ColoniesController) BlockingCmdQueueWorker ¶
func (controller *ColoniesController) BlockingCmdQueueWorker()
func (*ColoniesController) CalcNextRun ¶
func (controller *ColoniesController) CalcNextRun(cron *core.Cron) time.Time
func (*ColoniesController) CleanupWorker ¶ added in v1.9.3
func (controller *ColoniesController) CleanupWorker()
func (*ColoniesController) CloseFailed ¶
func (controller *ColoniesController) CloseFailed(processID string, errs []string) error
func (*ColoniesController) CloseSuccessful ¶
func (controller *ColoniesController) CloseSuccessful(processID string, executorID string, output []interface{}) error
func (*ColoniesController) CmdQueueWorker ¶
func (controller *ColoniesController) CmdQueueWorker()
func (*ColoniesController) CreateProcessGraph ¶
func (controller *ColoniesController) CreateProcessGraph(workflowSpec *core.WorkflowSpec, args []interface{}, kwargs map[string]interface{}, rootInput []interface{}, recoveredID string) (*core.ProcessGraph, error)
func (*ColoniesController) CreateResumeChannel ¶
func (controller *ColoniesController) CreateResumeChannel(colonyName string) <-chan bool
createResumeChannel creates a channel that will be signaled when assignments are resumed for a colony
func (*ColoniesController) CronTriggerLoop ¶
func (controller *ColoniesController) CronTriggerLoop()
func (*ColoniesController) DistributedAssign ¶ added in v1.9.6
func (controller *ColoniesController) DistributedAssign(executor *core.Executor, colonyName string, cpu int64, memory int64, storage int64) (*AssignResult, error)
DistributedAssign performs atomic process assignment using database-level locking. This method bypasses the scheduler and blocking queue, enabling horizontal scaling of the assign operation across multiple server replicas. Uses SELECT FOR UPDATE SKIP LOCKED to handle concurrent access without race conditions.
func (*ColoniesController) FindFailedProcessGraphs ¶
func (controller *ColoniesController) FindFailedProcessGraphs(colonyName string, count int) ([]*core.ProcessGraph, error)
func (*ColoniesController) FindRunningProcessGraphs ¶
func (controller *ColoniesController) FindRunningProcessGraphs(colonyName string, count int) ([]*core.ProcessGraph, error)
func (*ColoniesController) FindSuccessfulProcessGraphs ¶
func (controller *ColoniesController) FindSuccessfulProcessGraphs(colonyName string, count int) ([]*core.ProcessGraph, error)
func (*ColoniesController) FindWaitingProcessGraphs ¶
func (controller *ColoniesController) FindWaitingProcessGraphs(colonyName string, count int) ([]*core.ProcessGraph, error)
func (*ColoniesController) GeneratorTriggerLoop ¶
func (controller *ColoniesController) GeneratorTriggerLoop()
func (*ColoniesController) GetChannelRouter ¶ added in v1.9.6
func (controller *ColoniesController) GetChannelRouter() *channel.Router
GetChannelRouter returns the channel router
func (*ColoniesController) GetCron ¶
func (controller *ColoniesController) GetCron(cronID string) (*core.Cron, error)
func (*ColoniesController) GetCronByName ¶ added in v1.9.6
func (*ColoniesController) GetCronPeriod ¶
func (controller *ColoniesController) GetCronPeriod() int
func (*ColoniesController) GetEtcdServer ¶
func (controller *ColoniesController) GetEtcdServer() *cluster.EtcdServer
func (*ColoniesController) GetEventHandler ¶
func (controller *ColoniesController) GetEventHandler() backends.RealtimeEventHandler
func (*ColoniesController) GetGenerator ¶
func (controller *ColoniesController) GetGenerator(generatorID string) (*core.Generator, error)
func (*ColoniesController) GetGeneratorPeriod ¶
func (controller *ColoniesController) GetGeneratorPeriod() int
func (*ColoniesController) GetGenerators ¶
func (*ColoniesController) GetProcessGraphByID ¶
func (controller *ColoniesController) GetProcessGraphByID(processGraphID string) (*core.ProcessGraph, error)
func (*ColoniesController) GetProcessGraphStorage ¶
func (controller *ColoniesController) GetProcessGraphStorage() *processGraphStorageAdapter
getProcessGraphStorage creates a storage adapter for ProcessGraph
func (*ColoniesController) GetThisNode ¶
func (controller *ColoniesController) GetThisNode() cluster.Node
func (*ColoniesController) HandleDefunctProcessgraph ¶
func (controller *ColoniesController) HandleDefunctProcessgraph(processGraphID string, processID string, err error) error
func (*ColoniesController) IsLeader ¶
func (controller *ColoniesController) IsLeader() bool
func (*ColoniesController) NotifyChildren ¶
func (controller *ColoniesController) NotifyChildren(process *core.Process) error
func (*ColoniesController) PackGenerator ¶
func (controller *ColoniesController) PackGenerator(generatorID string, colonyName, arg string) error
func (*ColoniesController) PauseColonyAssignments ¶
func (controller *ColoniesController) PauseColonyAssignments(colonyName string) error
func (*ColoniesController) RemoveCron ¶
func (controller *ColoniesController) RemoveCron(cronID string) error
func (*ColoniesController) RemoveGenerator ¶
func (controller *ColoniesController) RemoveGenerator(generatorID string) error
func (*ColoniesController) ResetDatabase ¶
func (controller *ColoniesController) ResetDatabase() error
TODO: This function is incomplete for clustered deployments. When the database is reset, in-memory state (channels, subscriptions) becomes orphaned and can cause memory leaks. Each server replica should have a periodic cleanup routine that: 1. Iterates through channels in channelRouter 2. Checks if referenced processes still exist in DB 3. Removes orphaned channels and cancels stale subscriptions This cleanup should also be integrated with error handling paths (CloseFailed, TimeoutLoop, etc.)
func (*ColoniesController) ResetProcess ¶
func (controller *ColoniesController) ResetProcess(processID string) error
func (*ColoniesController) ResolveGenerator ¶
func (*ColoniesController) ResumeColonyAssignments ¶
func (controller *ColoniesController) ResumeColonyAssignments(colonyName string) error
func (*ColoniesController) RetentionWorker ¶
func (controller *ColoniesController) RetentionWorker()
func (*ColoniesController) RunCron ¶
func (controller *ColoniesController) RunCron(cronID string) (*core.Cron, error)
func (*ColoniesController) StartCron ¶
func (controller *ColoniesController) StartCron(cron *core.Cron)
func (*ColoniesController) Stop ¶
func (controller *ColoniesController) Stop()
func (*ColoniesController) SubmitWorkflow ¶
func (controller *ColoniesController) SubmitWorkflow(generator *core.Generator, counter int, recoveredID string)
func (*ColoniesController) SubmitWorkflowSpec ¶
func (controller *ColoniesController) SubmitWorkflowSpec(workflowSpec *core.WorkflowSpec, recoveredID string) (*core.ProcessGraph, error)
func (*ColoniesController) SubscribeProcess ¶
func (controller *ColoniesController) SubscribeProcess(executorID string, subscription *backends.RealtimeSubscription) error
func (*ColoniesController) SubscribeProcesses ¶
func (controller *ColoniesController) SubscribeProcesses(executorID string, subscription *backends.RealtimeSubscription) error
func (*ColoniesController) TimeoutLoop ¶
func (controller *ColoniesController) TimeoutLoop()
func (*ColoniesController) TriggerCrons ¶
func (controller *ColoniesController) TriggerCrons()
func (*ColoniesController) TriggerGenerators ¶
func (controller *ColoniesController) TriggerGenerators()
func (*ColoniesController) TryBecomeLeader ¶
func (controller *ColoniesController) TryBecomeLeader() bool
func (*ColoniesController) UnassignExecutor ¶
func (controller *ColoniesController) UnassignExecutor(processID string) error
func (*ColoniesController) UpdateProcessGraph ¶
func (controller *ColoniesController) UpdateProcessGraph(graph *core.ProcessGraph) error
func (*ColoniesController) WakeupPausedAssignments ¶
func (controller *ColoniesController) WakeupPausedAssignments(colonyName string)
wakeupPausedAssignments signals all waiting executors that assignments have been resumed
type Controller ¶
type Controller interface {
GetCronPeriod() int
GetGeneratorPeriod() int
GetEtcdServer() *cluster.EtcdServer
GetEventHandler() backends.RealtimeEventHandler
GetThisNode() cluster.Node
SubscribeProcesses(executorID string, subscription *backends.RealtimeSubscription) error
SubscribeProcess(executorID string, subscription *backends.RealtimeSubscription) error
AddProcessToDB(process *core.Process) (*core.Process, error)
AddProcess(process *core.Process) (*core.Process, error)
AddChild(processGraphID string, parentProcessID string, childProcessID string, process *core.Process, executorID string, insert bool) (*core.Process, error)
UpdateProcessGraph(graph *core.ProcessGraph) error
CreateProcessGraph(workflowSpec *core.WorkflowSpec, args []interface{}, kwargs map[string]interface{}, rootInput []interface{}, recoveredID string) (*core.ProcessGraph, error)
SubmitWorkflowSpec(workflowSpec *core.WorkflowSpec, recoveredID string) (*core.ProcessGraph, error)
GetProcessGraphByID(processGraphID string) (*core.ProcessGraph, error)
FindWaitingProcessGraphs(colonyName string, count int) ([]*core.ProcessGraph, error)
FindRunningProcessGraphs(colonyName string, count int) ([]*core.ProcessGraph, error)
FindSuccessfulProcessGraphs(colonyName string, count int) ([]*core.ProcessGraph, error)
FindFailedProcessGraphs(colonyName string, count int) ([]*core.ProcessGraph, error)
CloseSuccessful(processID string, executorID string, output []interface{}) error
NotifyChildren(process *core.Process) error
CloseFailed(processID string, errs []string) error
HandleDefunctProcessgraph(processGraphID string, processID string, err error) error
Assign(executorID string, colonyName string, cpu int64, memory int64) (*AssignResult, error)
DistributedAssign(executor *core.Executor, colonyName string, cpu int64, memory int64, storage int64) (*AssignResult, error)
UnassignExecutor(processID string) error
ResetProcess(processID string) error
AddGenerator(generator *core.Generator) (*core.Generator, error)
PackGenerator(generatorID string, colonyName, arg string) error
GeneratorTriggerLoop()
TriggerGenerators()
SubmitWorkflow(generator *core.Generator, counter int, recoveredID string) // TODO: change name, there is also a submitWorkflowSpec()
AddCron(cron *core.Cron) (*core.Cron, error)
RemoveGenerator(generatorID string) error
RunCron(cronID string) (*core.Cron, error)
RemoveCron(cronID string) error
CalcNextRun(cron *core.Cron) time.Time
StartCron(cron *core.Cron)
TriggerCrons()
CronTriggerLoop()
ResetDatabase() error
PauseColonyAssignments(colonyName string) error
ResumeColonyAssignments(colonyName string) error
AreColonyAssignmentsPaused(colonyName string) (bool, error)
Stop()
IsLeader() bool
TryBecomeLeader() bool
TimeoutLoop()
BlockingCmdQueueWorker()
RetentionWorker()
CmdQueueWorker()
GetChannelRouter() *channel.Router
}