Documentation
¶
Overview ¶
Package coordinator provides WebSocket-based coordination between services and when-v3. It handles phase management, health tracking, and real-time communication for distributed workflow execution.
Index ¶
- Variables
- func PayloadToStruct(payload map[string]interface{}, target interface{}) error
- type CancelPayload
- type CheckpointPayload
- type Config
- type Coordinator
- func (c *Coordinator) Close() error
- func (c *Coordinator) Connect() error
- func (c *Coordinator) IsConnected() bool
- func (c *Coordinator) LogDebug(message string, fields map[string]interface{})
- func (c *Coordinator) LogError(message string, fields map[string]interface{})
- func (c *Coordinator) LogInfo(message string, fields map[string]interface{})
- func (c *Coordinator) LogWarn(message string, fields map[string]interface{})
- func (c *Coordinator) LogWithContext(level LogLevel, message string, workflowID, actionID string, ...)
- func (c *Coordinator) OnConnected(fn func())
- func (c *Coordinator) OnDisconnected(fn func(error))
- func (c *Coordinator) OnMessage(msgType MessageType, handler MessageHandler)
- func (c *Coordinator) OnRegistered(fn func(serviceID string))
- func (c *Coordinator) Phases() *PhaseManager
- func (c *Coordinator) Send(msg *WSMessage)
- func (c *Coordinator) SendCheckpoint(workflowID, checkpointID, reason string)
- func (c *Coordinator) SendError(workflowID, actionID, errorMsg string, recoverable bool)
- func (c *Coordinator) SendLog(entry LogEntry)
- func (c *Coordinator) SendLogBatch(entries []LogEntry)
- func (c *Coordinator) SendProgress(workflowID, actionID string, percent float64, stage, message string)
- func (c *Coordinator) SendWorkflowCreated(workflowID, parentWorkflowID, rootWorkflowID, actionID, actionType string)
- type ErrorPayload
- type LogBatchPayload
- type LogEntry
- type LogForwarder
- type LogLevel
- type LogPayload
- type LogrusHook
- type MessageHandler
- type MessageType
- type PausePayload
- type Phase
- type PhaseChangedPayload
- type PhaseManager
- func (pm *PhaseManager) Cancel(workflowID, reason string) error
- func (pm *PhaseManager) Complete(workflowID string) error
- func (pm *PhaseManager) CompleteCancellation(workflowID string) error
- func (pm *PhaseManager) CompletePause(workflowID, checkpointID string) error
- func (pm *PhaseManager) CompleteResume(workflowID string) error
- func (pm *PhaseManager) CreateCheckpoint(workflowID, checkpointID, reason string, state map[string]interface{}) error
- func (pm *PhaseManager) Fail(workflowID, reason string) error
- func (pm *PhaseManager) GetActiveWorkflows() []*PhaseState
- func (pm *PhaseManager) GetAllWorkflows() []*PhaseState
- func (pm *PhaseManager) GetPhase(workflowID string) (Phase, bool)
- func (pm *PhaseManager) GetState(workflowID string) (*PhaseState, bool)
- func (pm *PhaseManager) OnCheckpoint(fn func(workflowID, checkpointID string, state map[string]interface{}))
- func (pm *PhaseManager) OnPhaseChanged(fn func(state *PhaseState))
- func (pm *PhaseManager) Pause(workflowID, reason string) error
- func (pm *PhaseManager) RegisterWorkflow(workflowID, parentWorkflowID, rootWorkflowID string) *PhaseState
- func (pm *PhaseManager) RemoveWorkflow(workflowID string)
- func (pm *PhaseManager) Resume(workflowID, fromCheckpoint string) error
- func (pm *PhaseManager) SetProgress(workflowID string, progress float64, currentAction string) error
- func (pm *PhaseManager) TransitionTo(workflowID string, newPhase Phase, reason string) error
- type PhaseState
- type ProgressPayload
- type RegisterPayload
- type RegisteredPayload
- type ResumePayload
- type StatusPayload
- type StatusResponsePayload
- type WSMessage
- func (m *WSMessage) GetCancelPayload() (*CancelPayload, error)
- func (m *WSMessage) GetPausePayload() (*PausePayload, error)
- func (m *WSMessage) GetRegisterPayload() (*RegisterPayload, error)
- func (m *WSMessage) GetResumePayload() (*ResumePayload, error)
- func (m *WSMessage) JSON() ([]byte, error)
- func (m *WSMessage) SetPayload(payload interface{}) error
- type WorkflowCreatedPayload
Constants ¶
This section is empty.
Variables ¶
var ValidTransitions = map[Phase][]Phase{ PhasePending: {PhasePreFlight, PhaseFailed}, PhasePreFlight: {PhasePlanning, PhaseFailed}, PhasePlanning: {PhaseExecution, PhaseFailed}, PhaseExecution: {PhasePausing, PhaseCancelling, PhaseCompleting, PhaseFailed}, PhasePausing: {PhasePaused, PhaseCancelling, PhaseFailed}, PhasePaused: {PhaseResuming, PhaseCancelling, PhaseFailed}, PhaseResuming: {PhaseExecution, PhaseCancelling, PhaseFailed}, PhaseCancelling: {PhaseCancelled, PhaseFailed}, PhaseCompleting: {PhaseCompleted, PhaseFailed}, }
ValidTransitions defines which phase transitions are allowed.
Functions ¶
func PayloadToStruct ¶
PayloadToStruct converts a message payload to a typed struct.
Types ¶
type CancelPayload ¶
type CancelPayload struct {
WorkflowID string `json:"workflow_id"`
Reason string `json:"reason,omitempty"`
Force bool `json:"force,omitempty"`
}
CancelPayload is the payload for cancel command.
type CheckpointPayload ¶
type CheckpointPayload struct {
WorkflowID string `json:"workflow_id"`
CheckpointID string `json:"checkpoint_id"`
Reason string `json:"reason"`
State map[string]interface{} `json:"state,omitempty"`
}
CheckpointPayload is the payload for checkpoint message.
type Config ¶
type Config struct {
// WhenURL is the WebSocket URL to connect to (e.g., "ws://localhost:8080/v1/coordination")
WhenURL string
// ServiceName is the name of this service (e.g., "containerservice")
ServiceName string
// ServiceID is a unique identifier for this service instance
ServiceID string
// InstanceID is a unique identifier for this specific instance (for multi-instance support)
InstanceID string
// Capabilities lists what this service can do
Capabilities []string
// Version is the service software version
Version string
// ProtocolVersion is the coordination protocol version (e.g., "1.0")
// If empty, defaults to "1.0"
ProtocolVersion string
// SchemaVersion is the database schema version this service expects
SchemaVersion int
// Reconnect settings
ReconnectInitialDelay time.Duration
ReconnectMaxDelay time.Duration
ReconnectBackoffFactor float64
ReconnectMaxAttempts int // 0 = infinite
// PingInterval is how often to send pings
PingInterval time.Duration
// Logger for coordinator messages
Logger *logrus.Entry
}
Config holds configuration for the Coordinator.
func DefaultConfig ¶
func DefaultConfig() Config
DefaultConfig returns a Config with sensible defaults.
type Coordinator ¶
type Coordinator struct {
// contains filtered or unexported fields
}
Coordinator manages WebSocket communication with when-v3.
func (*Coordinator) Connect ¶
func (c *Coordinator) Connect() error
Connect establishes the WebSocket connection and starts processing.
func (*Coordinator) IsConnected ¶
func (c *Coordinator) IsConnected() bool
IsConnected returns whether the WebSocket is connected.
func (*Coordinator) LogDebug ¶ added in v0.0.72
func (c *Coordinator) LogDebug(message string, fields map[string]interface{})
LogDebug sends a debug log message to when-v3.
func (*Coordinator) LogError ¶ added in v0.0.72
func (c *Coordinator) LogError(message string, fields map[string]interface{})
LogError sends an error log message to when-v3.
func (*Coordinator) LogInfo ¶ added in v0.0.72
func (c *Coordinator) LogInfo(message string, fields map[string]interface{})
LogInfo sends an info log message to when-v3.
func (*Coordinator) LogWarn ¶ added in v0.0.72
func (c *Coordinator) LogWarn(message string, fields map[string]interface{})
LogWarn sends a warning log message to when-v3.
func (*Coordinator) LogWithContext ¶ added in v0.0.72
func (c *Coordinator) LogWithContext(level LogLevel, message string, workflowID, actionID string, fields map[string]interface{})
LogWithContext sends a log message with workflow/action context.
func (*Coordinator) OnConnected ¶
func (c *Coordinator) OnConnected(fn func())
OnConnected sets a callback for when connection is established.
func (*Coordinator) OnDisconnected ¶
func (c *Coordinator) OnDisconnected(fn func(error))
OnDisconnected sets a callback for when connection is lost.
func (*Coordinator) OnMessage ¶
func (c *Coordinator) OnMessage(msgType MessageType, handler MessageHandler)
OnMessage registers a custom handler for a message type.
func (*Coordinator) OnRegistered ¶
func (c *Coordinator) OnRegistered(fn func(serviceID string))
OnRegistered sets a callback for when registration completes.
func (*Coordinator) Phases ¶
func (c *Coordinator) Phases() *PhaseManager
Phases returns the phase manager for direct access.
func (*Coordinator) Send ¶
func (c *Coordinator) Send(msg *WSMessage)
Send queues a message for sending.
func (*Coordinator) SendCheckpoint ¶
func (c *Coordinator) SendCheckpoint(workflowID, checkpointID, reason string)
SendCheckpoint notifies when-v3 of a checkpoint.
func (*Coordinator) SendError ¶
func (c *Coordinator) SendError(workflowID, actionID, errorMsg string, recoverable bool)
SendError notifies when-v3 of an error.
func (*Coordinator) SendLog ¶ added in v0.0.72
func (c *Coordinator) SendLog(entry LogEntry)
SendLog sends a single log entry to when-v3 for centralized aggregation.
func (*Coordinator) SendLogBatch ¶ added in v0.0.72
func (c *Coordinator) SendLogBatch(entries []LogEntry)
SendLogBatch sends multiple log entries to when-v3 for centralized aggregation. This is more efficient than sending individual logs when you have multiple entries.
func (*Coordinator) SendProgress ¶
func (c *Coordinator) SendProgress(workflowID, actionID string, percent float64, stage, message string)
SendProgress notifies when-v3 of progress.
func (*Coordinator) SendWorkflowCreated ¶
func (c *Coordinator) SendWorkflowCreated(workflowID, parentWorkflowID, rootWorkflowID, actionID, actionType string)
SendWorkflowCreated notifies when-v3 of a new child workflow.
type ErrorPayload ¶
type ErrorPayload struct {
WorkflowID string `json:"workflow_id"`
Error string `json:"error"`
Recoverable bool `json:"recoverable"`
ActionID string `json:"action_id,omitempty"`
}
ErrorPayload is the payload for error message.
type LogBatchPayload ¶ added in v0.0.72
type LogBatchPayload struct {
Logs []LogEntry `json:"logs"`
}
LogBatchPayload is the payload for a batch of log messages.
type LogEntry ¶ added in v0.0.72
type LogEntry struct {
Timestamp time.Time `json:"timestamp"`
Level string `json:"level"` // debug, info, warn, error, fatal
Message string `json:"message"`
TraceID string `json:"trace_id,omitempty"`
SpanID string `json:"span_id,omitempty"`
WorkflowID string `json:"workflow_id,omitempty"`
ActionID string `json:"action_id,omitempty"`
CorrelationID string `json:"correlation_id,omitempty"`
Fields map[string]interface{} `json:"fields,omitempty"` // Additional structured fields
SourceFile string `json:"source_file,omitempty"`
SourceLine int `json:"source_line,omitempty"`
}
LogEntry represents a single log entry to be forwarded to when-v3.
type LogForwarder ¶ added in v0.0.72
type LogForwarder struct {
// contains filtered or unexported fields
}
LogForwarder provides batched log forwarding for high-volume logging. It collects logs and sends them in batches to reduce WebSocket overhead.
func NewLogForwarder ¶ added in v0.0.72
func NewLogForwarder(coordinator *Coordinator, bufferSize int, flushInterval time.Duration) *LogForwarder
NewLogForwarder creates a new batched log forwarder. bufferSize is the maximum number of logs to buffer before flushing. flushInterval is how often to flush even if buffer isn't full.
func (*LogForwarder) Flush ¶ added in v0.0.72
func (lf *LogForwarder) Flush()
Flush immediately sends all buffered logs.
func (*LogForwarder) Log ¶ added in v0.0.72
func (lf *LogForwarder) Log(entry LogEntry)
Log adds a log entry to the buffer.
func (*LogForwarder) Stop ¶ added in v0.0.72
func (lf *LogForwarder) Stop()
Stop stops the log forwarder and flushes remaining logs.
type LogPayload ¶ added in v0.0.72
type LogPayload struct {
LogEntry
}
LogPayload is the payload for a single log message.
type LogrusHook ¶ added in v0.0.72
type LogrusHook struct {
// contains filtered or unexported fields
}
LogrusHook is a logrus hook that forwards log entries to when-v3. Use this to automatically forward all log messages from your service to the centralized log aggregation system in when-v3.
func NewLogrusHook ¶ added in v0.0.72
func NewLogrusHook(coordinator *Coordinator, minLevel logrus.Level) *LogrusHook
NewLogrusHook creates a new logrus hook for forwarding logs to when-v3. The minLevel parameter specifies the minimum log level to forward (default: Info).
func (*LogrusHook) Fire ¶ added in v0.0.72
func (h *LogrusHook) Fire(entry *logrus.Entry) error
Fire is called when a log entry is made.
func (*LogrusHook) Levels ¶ added in v0.0.72
func (h *LogrusHook) Levels() []logrus.Level
Levels returns the log levels this hook fires for.
type MessageHandler ¶
MessageHandler is a function that handles incoming messages.
type MessageType ¶
type MessageType string
MessageType defines the types of WebSocket messages exchanged between services and when-v3.
const ( // Service → when-v3 messages MessageTypeRegister MessageType = "register" MessageTypeWorkflowCreated MessageType = "workflow_created" MessageTypePhaseChanged MessageType = "phase_changed" MessageTypeCheckpoint MessageType = "checkpoint" MessageTypeError MessageType = "error" MessageTypeStatusResponse MessageType = "status_response" MessageTypePong MessageType = "pong" MessageTypeProgress MessageType = "progress" MessageTypeLog MessageType = "log" // Single log entry MessageTypeLogBatch MessageType = "log_batch" // Batch of log entries // when-v3 → Service messages MessageTypeRegistered MessageType = "registered" MessageTypePause MessageType = "pause" MessageTypeResume MessageType = "resume" MessageTypeCancel MessageType = "cancel" MessageTypeStatus MessageType = "status" MessageTypePing MessageType = "ping" )
type PausePayload ¶
type PausePayload struct {
WorkflowID string `json:"workflow_id"`
Reason string `json:"reason,omitempty"`
}
PausePayload is the payload for pause command.
type Phase ¶
type Phase string
Phase represents the current phase of a workflow execution.
const ( PhasePending Phase = "pending" PhasePreFlight Phase = "pre-flight" PhasePlanning Phase = "planning" PhaseExecution Phase = "execution" PhasePausing Phase = "pausing" PhasePaused Phase = "paused" PhaseResuming Phase = "resuming" PhaseCancelling Phase = "cancelling" PhaseCancelled Phase = "cancelled" PhaseCompleting Phase = "completing" PhaseCompleted Phase = "completed" PhaseFailed Phase = "failed" )
func (Phase) CanTransitionTo ¶
CanTransitionTo checks if a transition to the target phase is valid.
func (Phase) IsPausable ¶
IsPausable returns true if the workflow can be paused from this phase.
func (Phase) IsResumable ¶
IsResumable returns true if the workflow can be resumed from this phase.
func (Phase) IsTerminal ¶
IsTerminal returns true if the phase is a terminal state.
type PhaseChangedPayload ¶
type PhaseChangedPayload struct {
WorkflowID string `json:"workflow_id"`
FromPhase Phase `json:"from"`
ToPhase Phase `json:"to"`
CheckpointID string `json:"checkpoint_id,omitempty"`
Reason string `json:"reason,omitempty"`
}
PhaseChangedPayload is the payload for phase_changed message.
type PhaseManager ¶
type PhaseManager struct {
// contains filtered or unexported fields
}
PhaseManager manages phase states for multiple workflows.
func NewPhaseManager ¶
func NewPhaseManager() *PhaseManager
NewPhaseManager creates a new PhaseManager.
func (*PhaseManager) Cancel ¶
func (pm *PhaseManager) Cancel(workflowID, reason string) error
Cancel initiates cancellation of a workflow.
func (*PhaseManager) Complete ¶
func (pm *PhaseManager) Complete(workflowID string) error
Complete marks a workflow as completed.
func (*PhaseManager) CompleteCancellation ¶
func (pm *PhaseManager) CompleteCancellation(workflowID string) error
CompleteCancellation finishes the cancellation.
func (*PhaseManager) CompletePause ¶
func (pm *PhaseManager) CompletePause(workflowID, checkpointID string) error
CompletePause finishes the pause transition.
func (*PhaseManager) CompleteResume ¶
func (pm *PhaseManager) CompleteResume(workflowID string) error
CompleteResume finishes the resume transition.
func (*PhaseManager) CreateCheckpoint ¶
func (pm *PhaseManager) CreateCheckpoint(workflowID, checkpointID, reason string, state map[string]interface{}) error
CreateCheckpoint creates a checkpoint for a workflow.
func (*PhaseManager) Fail ¶
func (pm *PhaseManager) Fail(workflowID, reason string) error
Fail marks a workflow as failed.
func (*PhaseManager) GetActiveWorkflows ¶
func (pm *PhaseManager) GetActiveWorkflows() []*PhaseState
GetActiveWorkflows returns all workflows that are not in terminal states.
func (*PhaseManager) GetAllWorkflows ¶
func (pm *PhaseManager) GetAllWorkflows() []*PhaseState
GetAllWorkflows returns all tracked workflows.
func (*PhaseManager) GetPhase ¶
func (pm *PhaseManager) GetPhase(workflowID string) (Phase, bool)
GetPhase returns just the current phase of a workflow.
func (*PhaseManager) GetState ¶
func (pm *PhaseManager) GetState(workflowID string) (*PhaseState, bool)
GetState returns the current state of a workflow.
func (*PhaseManager) OnCheckpoint ¶
func (pm *PhaseManager) OnCheckpoint(fn func(workflowID, checkpointID string, state map[string]interface{}))
OnCheckpoint sets a callback for checkpoint creation.
func (*PhaseManager) OnPhaseChanged ¶
func (pm *PhaseManager) OnPhaseChanged(fn func(state *PhaseState))
OnPhaseChanged sets a callback for phase changes.
func (*PhaseManager) Pause ¶
func (pm *PhaseManager) Pause(workflowID, reason string) error
Pause initiates pausing of a workflow.
func (*PhaseManager) RegisterWorkflow ¶
func (pm *PhaseManager) RegisterWorkflow(workflowID, parentWorkflowID, rootWorkflowID string) *PhaseState
RegisterWorkflow registers a new workflow with initial pending state.
func (*PhaseManager) RemoveWorkflow ¶
func (pm *PhaseManager) RemoveWorkflow(workflowID string)
RemoveWorkflow removes a workflow from tracking.
func (*PhaseManager) Resume ¶
func (pm *PhaseManager) Resume(workflowID, fromCheckpoint string) error
Resume initiates resuming of a workflow.
func (*PhaseManager) SetProgress ¶
func (pm *PhaseManager) SetProgress(workflowID string, progress float64, currentAction string) error
SetProgress updates the progress of a workflow.
func (*PhaseManager) TransitionTo ¶
func (pm *PhaseManager) TransitionTo(workflowID string, newPhase Phase, reason string) error
TransitionTo attempts to transition a workflow to a new phase.
type PhaseState ¶
type PhaseState struct {
WorkflowID string
Phase Phase
PreviousPhase Phase
ChangedAt time.Time
Reason string
CheckpointID string
Progress float64
CurrentAction string
ParentWorkflowID string
RootWorkflowID string
}
PhaseState represents the state of a single workflow's phase.
type ProgressPayload ¶
type ProgressPayload struct {
WorkflowID string `json:"workflow_id"`
ActionID string `json:"action_id,omitempty"`
Percent float64 `json:"percent"`
Stage string `json:"stage,omitempty"`
Message string `json:"message,omitempty"`
CurrentItem int `json:"current_item,omitempty"`
TotalItems int `json:"total_items,omitempty"`
}
ProgressPayload is the payload for progress message.
type RegisterPayload ¶
type RegisterPayload struct {
ServiceName string `json:"service_name"`
ServiceID string `json:"service_id,omitempty"`
InstanceID string `json:"instance_id,omitempty"`
Capabilities []string `json:"capabilities"`
Version string `json:"version,omitempty"` // Service software version
ProtocolVersion string `json:"protocol_version,omitempty"` // Coordination protocol version (e.g., "1.0")
SchemaVersion int `json:"schema_version,omitempty"` // Database schema version service expects
}
RegisterPayload is the payload for a register message.
type RegisteredPayload ¶
type RegisteredPayload struct {
ServiceID string `json:"service_id"`
InstanceID string `json:"instance_id,omitempty"`
Message string `json:"message,omitempty"`
ProtocolVersion string `json:"protocol_version,omitempty"` // Negotiated protocol version
HubProtocolVersion string `json:"hub_protocol_version,omitempty"` // Hub's protocol version
}
RegisteredPayload is the payload for a registered response.
type ResumePayload ¶
type ResumePayload struct {
WorkflowID string `json:"workflow_id"`
FromCheckpoint string `json:"from_checkpoint,omitempty"`
}
ResumePayload is the payload for resume command.
type StatusPayload ¶
type StatusPayload struct {
WorkflowID string `json:"workflow_id"`
}
StatusPayload is the payload for status request.
type StatusResponsePayload ¶
type StatusResponsePayload struct {
WorkflowID string `json:"workflow_id"`
Phase Phase `json:"phase"`
Progress float64 `json:"progress"`
CurrentAction string `json:"current_action,omitempty"`
Message string `json:"message,omitempty"`
}
StatusResponsePayload is the payload for status_response message.
type WSMessage ¶
type WSMessage struct {
ID string `json:"id"` // For request/response correlation
Type MessageType `json:"type"` // Message type
WorkflowID string `json:"workflow_id,omitempty"` // Associated workflow
Timestamp time.Time `json:"timestamp"` // Message timestamp
Payload map[string]interface{} `json:"payload,omitempty"` // Message-specific data
}
WSMessage is the base message structure for all WebSocket communication.
func NewMessage ¶
func NewMessage(msgType MessageType) *WSMessage
NewMessage creates a new WSMessage with the given type.
func NewMessageWithWorkflow ¶
func NewMessageWithWorkflow(msgType MessageType, workflowID string) *WSMessage
NewMessageWithWorkflow creates a new WSMessage for a specific workflow.
func ParseMessage ¶
ParseMessage deserializes a JSON message.
func (*WSMessage) GetCancelPayload ¶
func (m *WSMessage) GetCancelPayload() (*CancelPayload, error)
GetCancelPayload extracts CancelPayload from message.
func (*WSMessage) GetPausePayload ¶
func (m *WSMessage) GetPausePayload() (*PausePayload, error)
GetPausePayload extracts PausePayload from message.
func (*WSMessage) GetRegisterPayload ¶
func (m *WSMessage) GetRegisterPayload() (*RegisterPayload, error)
GetRegisterPayload extracts RegisterPayload from message.
func (*WSMessage) GetResumePayload ¶
func (m *WSMessage) GetResumePayload() (*ResumePayload, error)
GetResumePayload extracts ResumePayload from message.
func (*WSMessage) SetPayload ¶
SetPayload sets the payload from a typed struct.
type WorkflowCreatedPayload ¶
type WorkflowCreatedPayload struct {
WorkflowID string `json:"workflow_id"`
ParentWorkflowID string `json:"parent_workflow_id,omitempty"`
RootWorkflowID string `json:"root_workflow_id,omitempty"`
ActionID string `json:"action_id,omitempty"`
ActionType string `json:"action_type,omitempty"`
}
WorkflowCreatedPayload is the payload for workflow_created message.