service

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 20, 2026 License: MIT Imports: 17 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ControlPlane

type ControlPlane struct {
	cpb.UnimplementedControlPlaneServiceServer
	// contains filtered or unexported fields
}

func NewControlPlane

func NewControlPlane(
	repo port.TaskRepository,
	logger port.Logger,
	workerManager *WorkerManager,
	pool *pgxpool.Pool,
	defaultTenant string,
	defaultNamespace string,
) *ControlPlane

func (*ControlPlane) GetClusterStats

func (*ControlPlane) GetTask

func (*ControlPlane) Init

func (s *ControlPlane) Init(ctx context.Context) error

func (*ControlPlane) ListTasks

func (*ControlPlane) PublishTaskEvent

func (s *ControlPlane) PublishTaskEvent(
	ctx context.Context,
	eventType cpb.TaskEventType,
	previousState domain.TaskState,
	task *domain.Task,
	reason string,
)

func (*ControlPlane) PublishWorkerEvent

func (s *ControlPlane) PublishWorkerEvent(ctx context.Context, workerID string, connected bool)

func (*ControlPlane) StreamTaskEvents

type Dispatcher

type Dispatcher struct {
	// contains filtered or unexported fields
}

func NewDispatcher

func NewDispatcher(wm *WorkerManager, taskQueue <-chan *domain.Task, logger port.Logger, repo port.TaskRepository, publisher TaskStatePublisher) *Dispatcher

func (*Dispatcher) Run

func (d *Dispatcher) Run(ctx context.Context)

type Orchestrator

type Orchestrator struct {
	pb.UnimplementedOrchestratorServer
	// contains filtered or unexported fields
}

func New

func New(repo port.TaskRepository, logger port.Logger, wm *WorkerManager, publisher TaskStatePublisher) *Orchestrator

func (*Orchestrator) CancelTask

func (*Orchestrator) CompleteTask

func (*Orchestrator) StreamTasks

func (*Orchestrator) SubmitTask

type SafeStream

type SafeStream struct {
	// contains filtered or unexported fields
}

func (*SafeStream) Send

func (s *SafeStream) Send(event *pb.TaskEvent) error

type StateManager

type StateManager struct {
	// contains filtered or unexported fields
}

func NewStateManager

func NewStateManager(repo port.TaskRepository, logger port.Logger, batchSize int, taskQueue chan<- *domain.Task, publisher TaskStatePublisher) *StateManager

func (*StateManager) Run

func (s *StateManager) Run(ctx context.Context)

type TaskStatePublisher

type TaskStatePublisher interface {
	PublishTaskEvent(ctx context.Context, eventType cpb.TaskEventType, previousState domain.TaskState, task *domain.Task, reason string)
	PublishWorkerEvent(ctx context.Context, workerID string, connected bool)
}

type WorkerManager

type WorkerManager struct {
	// contains filtered or unexported fields
}

func NewWorkerManager

func NewWorkerManager(logger port.Logger) *WorkerManager

func (*WorkerManager) ActiveWorkerCount

func (w *WorkerManager) ActiveWorkerCount() int

func (*WorkerManager) Add

func (*WorkerManager) CancelTask

func (w *WorkerManager) CancelTask(workerID string, taskID string) error

func (*WorkerManager) Count

func (w *WorkerManager) Count() int

func (*WorkerManager) DecrementActiveTasks

func (w *WorkerManager) DecrementActiveTasks(workerID string)

func (*WorkerManager) GetNextWorker

func (w *WorkerManager) GetNextWorker() (*SafeStream, string, error)

func (*WorkerManager) Remove

func (w *WorkerManager) Remove(id string) error

func (*WorkerManager) SetPublisher

func (w *WorkerManager) SetPublisher(publisher TaskStatePublisher)

type WorkerState

type WorkerState struct {
	Stream      *SafeStream
	ActiveTasks int
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL