coordinator

package
v0.0.78 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 30, 2025 License: Apache-2.0 Imports: 11 Imported by: 0

Documentation

Overview

Package coordinator provides WebSocket-based coordination between services and when-v3. It handles phase management, health tracking, and real-time communication for distributed workflow execution.

Index

Constants

This section is empty.

Variables

ValidTransitions defines which phase transitions are allowed.

Functions

func PayloadToStruct

func PayloadToStruct(payload map[string]interface{}, target interface{}) error

PayloadToStruct converts a message payload to a typed struct.

Types

type CancelPayload

type CancelPayload struct {
	WorkflowID string `json:"workflow_id"`
	Reason     string `json:"reason,omitempty"`
	Force      bool   `json:"force,omitempty"`
}

CancelPayload is the payload for cancel command.

type CheckpointPayload

type CheckpointPayload struct {
	WorkflowID   string                 `json:"workflow_id"`
	CheckpointID string                 `json:"checkpoint_id"`
	Reason       string                 `json:"reason"`
	State        map[string]interface{} `json:"state,omitempty"`
}

CheckpointPayload is the payload for checkpoint message.

type Config

type Config struct {
	// WhenURL is the WebSocket URL to connect to (e.g., "ws://localhost:8080/v1/coordination")
	WhenURL string

	// ServiceName is the name of this service (e.g., "containerservice")
	ServiceName string

	// ServiceID is a unique identifier for this service instance
	ServiceID string

	// InstanceID is a unique identifier for this specific instance (for multi-instance support)
	InstanceID string

	// Capabilities lists what this service can do
	Capabilities []string

	// Version is the service software version
	Version string

	// ProtocolVersion is the coordination protocol version (e.g., "1.0")
	// If empty, defaults to "1.0"
	ProtocolVersion string

	// SchemaVersion is the database schema version this service expects
	SchemaVersion int

	// Reconnect settings
	ReconnectInitialDelay  time.Duration
	ReconnectMaxDelay      time.Duration
	ReconnectBackoffFactor float64
	ReconnectMaxAttempts   int // 0 = infinite

	// PingInterval is how often to send pings
	PingInterval time.Duration

	// Logger for coordinator messages
	Logger *logrus.Entry
}

Config holds configuration for the Coordinator.

func DefaultConfig

func DefaultConfig() Config

DefaultConfig returns a Config with sensible defaults.

type Coordinator

type Coordinator struct {
	// contains filtered or unexported fields
}

Coordinator manages WebSocket communication with when-v3.

func New

func New(config Config) *Coordinator

New creates a new Coordinator.

func (*Coordinator) Close

func (c *Coordinator) Close() error

Close shuts down the coordinator.

func (*Coordinator) Connect

func (c *Coordinator) Connect() error

Connect establishes the WebSocket connection and starts processing.

func (*Coordinator) IsConnected

func (c *Coordinator) IsConnected() bool

IsConnected returns whether the WebSocket is connected.

func (*Coordinator) LogDebug added in v0.0.72

func (c *Coordinator) LogDebug(message string, fields map[string]interface{})

LogDebug sends a debug log message to when-v3.

func (*Coordinator) LogError added in v0.0.72

func (c *Coordinator) LogError(message string, fields map[string]interface{})

LogError sends an error log message to when-v3.

func (*Coordinator) LogInfo added in v0.0.72

func (c *Coordinator) LogInfo(message string, fields map[string]interface{})

LogInfo sends an info log message to when-v3.

func (*Coordinator) LogWarn added in v0.0.72

func (c *Coordinator) LogWarn(message string, fields map[string]interface{})

LogWarn sends a warning log message to when-v3.

func (*Coordinator) LogWithContext added in v0.0.72

func (c *Coordinator) LogWithContext(level LogLevel, message string, workflowID, actionID string, fields map[string]interface{})

LogWithContext sends a log message with workflow/action context.

func (*Coordinator) OnConnected

func (c *Coordinator) OnConnected(fn func())

OnConnected sets a callback for when connection is established.

func (*Coordinator) OnDisconnected

func (c *Coordinator) OnDisconnected(fn func(error))

OnDisconnected sets a callback for when connection is lost.

func (*Coordinator) OnMessage

func (c *Coordinator) OnMessage(msgType MessageType, handler MessageHandler)

OnMessage registers a custom handler for a message type.

func (*Coordinator) OnRegistered

func (c *Coordinator) OnRegistered(fn func(serviceID string))

OnRegistered sets a callback for when registration completes.

func (*Coordinator) Phases

func (c *Coordinator) Phases() *PhaseManager

Phases returns the phase manager for direct access.

func (*Coordinator) Send

func (c *Coordinator) Send(msg *WSMessage)

Send queues a message for sending.

func (*Coordinator) SendCheckpoint

func (c *Coordinator) SendCheckpoint(workflowID, checkpointID, reason string)

SendCheckpoint notifies when-v3 of a checkpoint.

func (*Coordinator) SendError

func (c *Coordinator) SendError(workflowID, actionID, errorMsg string, recoverable bool)

SendError notifies when-v3 of an error.

func (*Coordinator) SendLog added in v0.0.72

func (c *Coordinator) SendLog(entry LogEntry)

SendLog sends a single log entry to when-v3 for centralized aggregation.

func (*Coordinator) SendLogBatch added in v0.0.72

func (c *Coordinator) SendLogBatch(entries []LogEntry)

SendLogBatch sends multiple log entries to when-v3 for centralized aggregation. This is more efficient than sending individual logs when you have multiple entries.

func (*Coordinator) SendProgress

func (c *Coordinator) SendProgress(workflowID, actionID string, percent float64, stage, message string)

SendProgress notifies when-v3 of progress.

func (*Coordinator) SendWorkflowCreated

func (c *Coordinator) SendWorkflowCreated(workflowID, parentWorkflowID, rootWorkflowID, actionID, actionType string)

SendWorkflowCreated notifies when-v3 of a new child workflow.

type ErrorPayload

type ErrorPayload struct {
	WorkflowID  string `json:"workflow_id"`
	Error       string `json:"error"`
	Recoverable bool   `json:"recoverable"`
	ActionID    string `json:"action_id,omitempty"`
}

ErrorPayload is the payload for error message.

type LogBatchPayload added in v0.0.72

type LogBatchPayload struct {
	Logs []LogEntry `json:"logs"`
}

LogBatchPayload is the payload for a batch of log messages.

type LogEntry added in v0.0.72

type LogEntry struct {
	Timestamp     time.Time              `json:"timestamp"`
	Level         string                 `json:"level"` // debug, info, warn, error, fatal
	Message       string                 `json:"message"`
	TraceID       string                 `json:"trace_id,omitempty"`
	SpanID        string                 `json:"span_id,omitempty"`
	WorkflowID    string                 `json:"workflow_id,omitempty"`
	ActionID      string                 `json:"action_id,omitempty"`
	CorrelationID string                 `json:"correlation_id,omitempty"`
	Fields        map[string]interface{} `json:"fields,omitempty"` // Additional structured fields
	SourceFile    string                 `json:"source_file,omitempty"`
	SourceLine    int                    `json:"source_line,omitempty"`
}

LogEntry represents a single log entry to be forwarded to when-v3.

type LogForwarder added in v0.0.72

type LogForwarder struct {
	// contains filtered or unexported fields
}

LogForwarder provides batched log forwarding for high-volume logging. It collects logs and sends them in batches to reduce WebSocket overhead.

func NewLogForwarder added in v0.0.72

func NewLogForwarder(coordinator *Coordinator, bufferSize int, flushInterval time.Duration) *LogForwarder

NewLogForwarder creates a new batched log forwarder. bufferSize is the maximum number of logs to buffer before flushing. flushInterval is how often to flush even if buffer isn't full.

func (*LogForwarder) Flush added in v0.0.72

func (lf *LogForwarder) Flush()

Flush immediately sends all buffered logs.

func (*LogForwarder) Log added in v0.0.72

func (lf *LogForwarder) Log(entry LogEntry)

Log adds a log entry to the buffer.

func (*LogForwarder) Stop added in v0.0.72

func (lf *LogForwarder) Stop()

Stop stops the log forwarder and flushes remaining logs.

type LogLevel added in v0.0.72

type LogLevel string

LogLevel represents log severity levels.

const (
	LogLevelDebug LogLevel = "debug"
	LogLevelInfo  LogLevel = "info"
	LogLevelWarn  LogLevel = "warn"
	LogLevelError LogLevel = "error"
	LogLevelFatal LogLevel = "fatal"
)

type LogPayload added in v0.0.72

type LogPayload struct {
	LogEntry
}

LogPayload is the payload for a single log message.

type LogrusHook added in v0.0.72

type LogrusHook struct {
	// contains filtered or unexported fields
}

LogrusHook is a logrus hook that forwards log entries to when-v3. Use this to automatically forward all log messages from your service to the centralized log aggregation system in when-v3.

func NewLogrusHook added in v0.0.72

func NewLogrusHook(coordinator *Coordinator, minLevel logrus.Level) *LogrusHook

NewLogrusHook creates a new logrus hook for forwarding logs to when-v3. The minLevel parameter specifies the minimum log level to forward (default: Info).

func (*LogrusHook) Fire added in v0.0.72

func (h *LogrusHook) Fire(entry *logrus.Entry) error

Fire is called when a log entry is made.

func (*LogrusHook) Levels added in v0.0.72

func (h *LogrusHook) Levels() []logrus.Level

Levels returns the log levels this hook fires for.

type MessageHandler

type MessageHandler func(msg *WSMessage) error

MessageHandler is a function that handles incoming messages.

type MessageType

type MessageType string

MessageType defines the types of WebSocket messages exchanged between services and when-v3.

const (
	// Service → when-v3 messages
	MessageTypeRegister        MessageType = "register"
	MessageTypeWorkflowCreated MessageType = "workflow_created"
	MessageTypePhaseChanged    MessageType = "phase_changed"
	MessageTypeCheckpoint      MessageType = "checkpoint"
	MessageTypeError           MessageType = "error"
	MessageTypeStatusResponse  MessageType = "status_response"
	MessageTypePong            MessageType = "pong"
	MessageTypeProgress        MessageType = "progress"
	MessageTypeLog             MessageType = "log"       // Single log entry
	MessageTypeLogBatch        MessageType = "log_batch" // Batch of log entries

	// when-v3 → Service messages
	MessageTypeRegistered MessageType = "registered"
	MessageTypePause      MessageType = "pause"
	MessageTypeResume     MessageType = "resume"
	MessageTypeCancel     MessageType = "cancel"
	MessageTypeStatus     MessageType = "status"
	MessageTypePing       MessageType = "ping"
)

type PausePayload

type PausePayload struct {
	WorkflowID string `json:"workflow_id"`
	Reason     string `json:"reason,omitempty"`
}

PausePayload is the payload for pause command.

type Phase

type Phase string

Phase represents the current phase of a workflow execution.

const (
	PhasePending    Phase = "pending"
	PhasePreFlight  Phase = "pre-flight"
	PhasePlanning   Phase = "planning"
	PhaseExecution  Phase = "execution"
	PhasePausing    Phase = "pausing"
	PhasePaused     Phase = "paused"
	PhaseResuming   Phase = "resuming"
	PhaseCancelling Phase = "cancelling"
	PhaseCancelled  Phase = "cancelled"
	PhaseCompleting Phase = "completing"
	PhaseCompleted  Phase = "completed"
	PhaseFailed     Phase = "failed"
)

func (Phase) CanTransitionTo

func (p Phase) CanTransitionTo(target Phase) bool

CanTransitionTo checks if a transition to the target phase is valid.

func (Phase) IsActive

func (p Phase) IsActive() bool

IsActive returns true if the phase indicates active processing.

func (Phase) IsPausable

func (p Phase) IsPausable() bool

IsPausable returns true if the workflow can be paused from this phase.

func (Phase) IsResumable

func (p Phase) IsResumable() bool

IsResumable returns true if the workflow can be resumed from this phase.

func (Phase) IsTerminal

func (p Phase) IsTerminal() bool

IsTerminal returns true if the phase is a terminal state.

type PhaseChangedPayload

type PhaseChangedPayload struct {
	WorkflowID   string `json:"workflow_id"`
	FromPhase    Phase  `json:"from"`
	ToPhase      Phase  `json:"to"`
	CheckpointID string `json:"checkpoint_id,omitempty"`
	Reason       string `json:"reason,omitempty"`
}

PhaseChangedPayload is the payload for phase_changed message.

type PhaseManager

type PhaseManager struct {
	// contains filtered or unexported fields
}

PhaseManager manages phase states for multiple workflows.

func NewPhaseManager

func NewPhaseManager() *PhaseManager

NewPhaseManager creates a new PhaseManager.

func (*PhaseManager) Cancel

func (pm *PhaseManager) Cancel(workflowID, reason string) error

Cancel initiates cancellation of a workflow.

func (*PhaseManager) Complete

func (pm *PhaseManager) Complete(workflowID string) error

Complete marks a workflow as completed.

func (*PhaseManager) CompleteCancellation

func (pm *PhaseManager) CompleteCancellation(workflowID string) error

CompleteCancellation finishes the cancellation.

func (*PhaseManager) CompletePause

func (pm *PhaseManager) CompletePause(workflowID, checkpointID string) error

CompletePause finishes the pause transition.

func (*PhaseManager) CompleteResume

func (pm *PhaseManager) CompleteResume(workflowID string) error

CompleteResume finishes the resume transition.

func (*PhaseManager) CreateCheckpoint

func (pm *PhaseManager) CreateCheckpoint(workflowID, checkpointID, reason string, state map[string]interface{}) error

CreateCheckpoint creates a checkpoint for a workflow.

func (*PhaseManager) Fail

func (pm *PhaseManager) Fail(workflowID, reason string) error

Fail marks a workflow as failed.

func (*PhaseManager) GetActiveWorkflows

func (pm *PhaseManager) GetActiveWorkflows() []*PhaseState

GetActiveWorkflows returns all workflows that are not in terminal states.

func (*PhaseManager) GetAllWorkflows

func (pm *PhaseManager) GetAllWorkflows() []*PhaseState

GetAllWorkflows returns all tracked workflows.

func (*PhaseManager) GetPhase

func (pm *PhaseManager) GetPhase(workflowID string) (Phase, bool)

GetPhase returns just the current phase of a workflow.

func (*PhaseManager) GetState

func (pm *PhaseManager) GetState(workflowID string) (*PhaseState, bool)

GetState returns the current state of a workflow.

func (*PhaseManager) OnCheckpoint

func (pm *PhaseManager) OnCheckpoint(fn func(workflowID, checkpointID string, state map[string]interface{}))

OnCheckpoint sets a callback for checkpoint creation.

func (*PhaseManager) OnPhaseChanged

func (pm *PhaseManager) OnPhaseChanged(fn func(state *PhaseState))

OnPhaseChanged sets a callback for phase changes.

func (*PhaseManager) Pause

func (pm *PhaseManager) Pause(workflowID, reason string) error

Pause initiates pausing of a workflow.

func (*PhaseManager) RegisterWorkflow

func (pm *PhaseManager) RegisterWorkflow(workflowID, parentWorkflowID, rootWorkflowID string) *PhaseState

RegisterWorkflow registers a new workflow with initial pending state.

func (*PhaseManager) RemoveWorkflow

func (pm *PhaseManager) RemoveWorkflow(workflowID string)

RemoveWorkflow removes a workflow from tracking.

func (*PhaseManager) Resume

func (pm *PhaseManager) Resume(workflowID, fromCheckpoint string) error

Resume initiates resuming of a workflow.

func (*PhaseManager) SetProgress

func (pm *PhaseManager) SetProgress(workflowID string, progress float64, currentAction string) error

SetProgress updates the progress of a workflow.

func (*PhaseManager) TransitionTo

func (pm *PhaseManager) TransitionTo(workflowID string, newPhase Phase, reason string) error

TransitionTo attempts to transition a workflow to a new phase.

type PhaseState

type PhaseState struct {
	WorkflowID       string
	Phase            Phase
	PreviousPhase    Phase
	ChangedAt        time.Time
	Reason           string
	CheckpointID     string
	Progress         float64
	CurrentAction    string
	ParentWorkflowID string
	RootWorkflowID   string
}

PhaseState represents the state of a single workflow's phase.

type ProgressPayload

type ProgressPayload struct {
	WorkflowID  string  `json:"workflow_id"`
	ActionID    string  `json:"action_id,omitempty"`
	Percent     float64 `json:"percent"`
	Stage       string  `json:"stage,omitempty"`
	Message     string  `json:"message,omitempty"`
	CurrentItem int     `json:"current_item,omitempty"`
	TotalItems  int     `json:"total_items,omitempty"`
}

ProgressPayload is the payload for progress message.

type RegisterPayload

type RegisterPayload struct {
	ServiceName     string   `json:"service_name"`
	ServiceID       string   `json:"service_id,omitempty"`
	InstanceID      string   `json:"instance_id,omitempty"`
	Capabilities    []string `json:"capabilities"`
	Version         string   `json:"version,omitempty"`          // Service software version
	ProtocolVersion string   `json:"protocol_version,omitempty"` // Coordination protocol version (e.g., "1.0")
	SchemaVersion   int      `json:"schema_version,omitempty"`   // Database schema version service expects
}

RegisterPayload is the payload for a register message.

type RegisteredPayload

type RegisteredPayload struct {
	ServiceID          string `json:"service_id"`
	InstanceID         string `json:"instance_id,omitempty"`
	Message            string `json:"message,omitempty"`
	ProtocolVersion    string `json:"protocol_version,omitempty"`     // Negotiated protocol version
	HubProtocolVersion string `json:"hub_protocol_version,omitempty"` // Hub's protocol version
}

RegisteredPayload is the payload for a registered response.

type ResumePayload

type ResumePayload struct {
	WorkflowID     string `json:"workflow_id"`
	FromCheckpoint string `json:"from_checkpoint,omitempty"`
}

ResumePayload is the payload for resume command.

type StatusPayload

type StatusPayload struct {
	WorkflowID string `json:"workflow_id"`
}

StatusPayload is the payload for status request.

type StatusResponsePayload

type StatusResponsePayload struct {
	WorkflowID    string  `json:"workflow_id"`
	Phase         Phase   `json:"phase"`
	Progress      float64 `json:"progress"`
	CurrentAction string  `json:"current_action,omitempty"`
	Message       string  `json:"message,omitempty"`
}

StatusResponsePayload is the payload for status_response message.

type WSMessage

type WSMessage struct {
	ID         string                 `json:"id"`                    // For request/response correlation
	Type       MessageType            `json:"type"`                  // Message type
	WorkflowID string                 `json:"workflow_id,omitempty"` // Associated workflow
	Timestamp  time.Time              `json:"timestamp"`             // Message timestamp
	Payload    map[string]interface{} `json:"payload,omitempty"`     // Message-specific data
}

WSMessage is the base message structure for all WebSocket communication.

func NewMessage

func NewMessage(msgType MessageType) *WSMessage

NewMessage creates a new WSMessage with the given type.

func NewMessageWithWorkflow

func NewMessageWithWorkflow(msgType MessageType, workflowID string) *WSMessage

NewMessageWithWorkflow creates a new WSMessage for a specific workflow.

func ParseMessage

func ParseMessage(data []byte) (*WSMessage, error)

ParseMessage deserializes a JSON message.

func (*WSMessage) GetCancelPayload

func (m *WSMessage) GetCancelPayload() (*CancelPayload, error)

GetCancelPayload extracts CancelPayload from message.

func (*WSMessage) GetPausePayload

func (m *WSMessage) GetPausePayload() (*PausePayload, error)

GetPausePayload extracts PausePayload from message.

func (*WSMessage) GetRegisterPayload

func (m *WSMessage) GetRegisterPayload() (*RegisterPayload, error)

GetRegisterPayload extracts RegisterPayload from message.

func (*WSMessage) GetResumePayload

func (m *WSMessage) GetResumePayload() (*ResumePayload, error)

GetResumePayload extracts ResumePayload from message.

func (*WSMessage) JSON

func (m *WSMessage) JSON() ([]byte, error)

JSON serializes the message to JSON bytes.

func (*WSMessage) SetPayload

func (m *WSMessage) SetPayload(payload interface{}) error

SetPayload sets the payload from a typed struct.

type WorkflowCreatedPayload

type WorkflowCreatedPayload struct {
	WorkflowID       string `json:"workflow_id"`
	ParentWorkflowID string `json:"parent_workflow_id,omitempty"`
	RootWorkflowID   string `json:"root_workflow_id,omitempty"`
	ActionID         string `json:"action_id,omitempty"`
	ActionType       string `json:"action_type,omitempty"`
}

WorkflowCreatedPayload is the payload for workflow_created message.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL