Documentation
¶
Index ¶
- type ControlPlane
- func (s *ControlPlane) GetClusterStats(ctx context.Context, req *cpb.GetClusterStatsRequest) (*cpb.GetClusterStatsResponse, error)
- func (s *ControlPlane) GetTask(ctx context.Context, req *cpb.GetTaskRequest) (*cpb.GetTaskResponse, error)
- func (s *ControlPlane) Init(ctx context.Context) error
- func (s *ControlPlane) ListTasks(ctx context.Context, req *cpb.ListTasksRequest) (*cpb.ListTasksResponse, error)
- func (s *ControlPlane) PublishTaskEvent(ctx context.Context, eventType cpb.TaskEventType, ...)
- func (s *ControlPlane) PublishWorkerEvent(ctx context.Context, workerID string, connected bool)
- func (s *ControlPlane) StreamTaskEvents(req *cpb.StreamTaskEventsRequest, ...) error
- type Dispatcher
- type Orchestrator
- func (s *Orchestrator) CancelTask(ctx context.Context, req *pb.CancelTaskRequest) (*pb.CancelTaskResponse, error)
- func (s *Orchestrator) CompleteTask(ctx context.Context, req *pb.CompleteTaskRequest) (*pb.CompleteTaskResponse, error)
- func (s *Orchestrator) StreamTasks(req *pb.StreamTasksRequest, stream pb.Orchestrator_StreamTasksServer) error
- func (s *Orchestrator) SubmitTask(ctx context.Context, req *pb.SubmitTaskRequest) (*pb.SubmitTaskResponse, error)
- type SafeStream
- type StateManager
- type TaskStatePublisher
- type WorkerManager
- func (w *WorkerManager) ActiveWorkerCount() int
- func (w *WorkerManager) Add(id string, stream pb.Orchestrator_StreamTasksServer) error
- func (w *WorkerManager) CancelTask(workerID string, taskID string) error
- func (w *WorkerManager) Count() int
- func (w *WorkerManager) DecrementActiveTasks(workerID string)
- func (w *WorkerManager) GetNextWorker() (*SafeStream, string, error)
- func (w *WorkerManager) Remove(id string) error
- func (w *WorkerManager) SetPublisher(publisher TaskStatePublisher)
- type WorkerState
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ControlPlane ¶
type ControlPlane struct {
cpb.UnimplementedControlPlaneServiceServer
// contains filtered or unexported fields
}
func NewControlPlane ¶
func NewControlPlane( repo port.TaskRepository, logger port.Logger, workerManager *WorkerManager, pool *pgxpool.Pool, defaultTenant string, defaultNamespace string, ) *ControlPlane
func (*ControlPlane) GetClusterStats ¶
func (s *ControlPlane) GetClusterStats(ctx context.Context, req *cpb.GetClusterStatsRequest) (*cpb.GetClusterStatsResponse, error)
func (*ControlPlane) GetTask ¶
func (s *ControlPlane) GetTask(ctx context.Context, req *cpb.GetTaskRequest) (*cpb.GetTaskResponse, error)
func (*ControlPlane) ListTasks ¶
func (s *ControlPlane) ListTasks(ctx context.Context, req *cpb.ListTasksRequest) (*cpb.ListTasksResponse, error)
func (*ControlPlane) PublishTaskEvent ¶
func (s *ControlPlane) PublishTaskEvent( ctx context.Context, eventType cpb.TaskEventType, previousState domain.TaskState, task *domain.Task, reason string, )
func (*ControlPlane) PublishWorkerEvent ¶
func (s *ControlPlane) PublishWorkerEvent(ctx context.Context, workerID string, connected bool)
func (*ControlPlane) StreamTaskEvents ¶
func (s *ControlPlane) StreamTaskEvents(req *cpb.StreamTaskEventsRequest, stream cpb.ControlPlaneService_StreamTaskEventsServer) error
type Dispatcher ¶
type Dispatcher struct {
// contains filtered or unexported fields
}
func NewDispatcher ¶
func NewDispatcher(wm *WorkerManager, taskQueue <-chan *domain.Task, logger port.Logger, repo port.TaskRepository, publisher TaskStatePublisher) *Dispatcher
func (*Dispatcher) Run ¶
func (d *Dispatcher) Run(ctx context.Context)
type Orchestrator ¶
type Orchestrator struct {
pb.UnimplementedOrchestratorServer
// contains filtered or unexported fields
}
func New ¶
func New(repo port.TaskRepository, logger port.Logger, wm *WorkerManager, publisher TaskStatePublisher) *Orchestrator
func (*Orchestrator) CancelTask ¶
func (s *Orchestrator) CancelTask(ctx context.Context, req *pb.CancelTaskRequest) (*pb.CancelTaskResponse, error)
func (*Orchestrator) CompleteTask ¶
func (s *Orchestrator) CompleteTask(ctx context.Context, req *pb.CompleteTaskRequest) (*pb.CompleteTaskResponse, error)
func (*Orchestrator) StreamTasks ¶
func (s *Orchestrator) StreamTasks(req *pb.StreamTasksRequest, stream pb.Orchestrator_StreamTasksServer) error
func (*Orchestrator) SubmitTask ¶
func (s *Orchestrator) SubmitTask(ctx context.Context, req *pb.SubmitTaskRequest) (*pb.SubmitTaskResponse, error)
type SafeStream ¶
type SafeStream struct {
// contains filtered or unexported fields
}
type StateManager ¶
type StateManager struct {
// contains filtered or unexported fields
}
func NewStateManager ¶
func NewStateManager(repo port.TaskRepository, logger port.Logger, batchSize int, taskQueue chan<- *domain.Task, publisher TaskStatePublisher) *StateManager
func (*StateManager) Run ¶
func (s *StateManager) Run(ctx context.Context)
type TaskStatePublisher ¶
type WorkerManager ¶
type WorkerManager struct {
// contains filtered or unexported fields
}
func NewWorkerManager ¶
func NewWorkerManager(logger port.Logger) *WorkerManager
func (*WorkerManager) ActiveWorkerCount ¶
func (w *WorkerManager) ActiveWorkerCount() int
func (*WorkerManager) Add ¶
func (w *WorkerManager) Add(id string, stream pb.Orchestrator_StreamTasksServer) error
func (*WorkerManager) CancelTask ¶
func (w *WorkerManager) CancelTask(workerID string, taskID string) error
func (*WorkerManager) Count ¶
func (w *WorkerManager) Count() int
func (*WorkerManager) DecrementActiveTasks ¶
func (w *WorkerManager) DecrementActiveTasks(workerID string)
func (*WorkerManager) GetNextWorker ¶
func (w *WorkerManager) GetNextWorker() (*SafeStream, string, error)
func (*WorkerManager) Remove ¶
func (w *WorkerManager) Remove(id string) error
func (*WorkerManager) SetPublisher ¶
func (w *WorkerManager) SetPublisher(publisher TaskStatePublisher)
type WorkerState ¶
type WorkerState struct {
Stream *SafeStream
ActiveTasks int
}
Click to show internal directories.
Click to hide internal directories.