Documentation
¶
Overview ¶
Package workflow defines the Workflow domain entities for automation orchestration. Unlike Pipelines (scan execution), Workflows handle general automation: notifications, ticket creation, assignments, escalations, and triggering pipelines.
Index ¶
- type ActionType
- type Edge
- type EdgeRepository
- type Node
- func (n *Node) Clone() *Node
- func (n *Node) SetActionConfig(actionType ActionType, config map[string]any) error
- func (n *Node) SetConditionConfig(expr string) error
- func (n *Node) SetDescription(desc string)
- func (n *Node) SetNotificationConfig(notifType NotificationType, config map[string]any) error
- func (n *Node) SetTriggerConfig(triggerType TriggerType, config map[string]any) error
- func (n *Node) SetUIPosition(x, y float64)
- type NodeConfig
- type NodeRepository
- type NodeRun
- func (nr *NodeRun) Complete(output map[string]any)
- func (nr *NodeRun) Duration() time.Duration
- func (nr *NodeRun) Fail(errorMessage, errorCode string)
- func (nr *NodeRun) SetConditionResult(result bool)
- func (nr *NodeRun) SetInput(input map[string]any)
- func (nr *NodeRun) Skip(reason string)
- func (nr *NodeRun) Start()
- type NodeRunFilter
- type NodeRunRepository
- type NodeRunStatus
- type NodeType
- type NotificationType
- type Run
- func (r *Run) AddToContext(key string, value any)
- func (r *Run) Cancel()
- func (r *Run) Complete()
- func (r *Run) Duration() time.Duration
- func (r *Run) Fail(errorMessage string)
- func (r *Run) SetContext(ctx map[string]any)
- func (r *Run) SetStats(total, completed, failed int)
- func (r *Run) SetTriggeredBy(userID shared.ID)
- func (r *Run) Start()
- type RunFilter
- type RunRepository
- type RunStatus
- type TriggerType
- type UIPosition
- type Workflow
- func (w *Workflow) Activate()
- func (w *Workflow) AddEdge(edge *Edge)
- func (w *Workflow) AddNode(node *Node)
- func (w *Workflow) Clone(newName string) *Workflow
- func (w *Workflow) Deactivate()
- func (w *Workflow) GetDownstreamNodes(nodeKey string) []*Node
- func (w *Workflow) GetNodeByKey(key string) *Node
- func (w *Workflow) GetTriggerNodes() []*Node
- func (w *Workflow) RecordRun(runID shared.ID, status string)
- func (w *Workflow) SetCreatedBy(userID shared.ID)
- func (w *Workflow) SuccessRate() int
- func (w *Workflow) ValidateGraph() error
- type WorkflowFilter
- type WorkflowRepository
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ActionType ¶
type ActionType string
ActionType represents the type of action.
const ( ActionTypeAssignUser ActionType = "assign_user" ActionTypeAssignTeam ActionType = "assign_team" ActionTypeUpdatePriority ActionType = "update_priority" ActionTypeUpdateStatus ActionType = "update_status" ActionTypeAddTags ActionType = "add_tags" ActionTypeRemoveTags ActionType = "remove_tags" ActionTypeCreateTicket ActionType = "create_ticket" ActionTypeUpdateTicket ActionType = "update_ticket" ActionTypeTriggerPipeline ActionType = "trigger_pipeline" ActionTypeTriggerScan ActionType = "trigger_scan" ActionTypeHTTPRequest ActionType = "http_request" ActionTypeRunScript ActionType = "run_script" // AI Triage action ActionTypeTriggerAITriage ActionType = "trigger_ai_triage" )
func (ActionType) IsValid ¶
func (t ActionType) IsValid() bool
IsValid checks if the action type is valid.
type Edge ¶
type Edge struct {
ID shared.ID
WorkflowID shared.ID
// Connection definition
SourceNodeKey string
TargetNodeKey string
// For condition nodes, specify which output handle
// "yes" or "no" for condition nodes, empty for other types
SourceHandle string
// Optional label for display
Label string
// Timestamps
CreatedAt time.Time
}
Edge represents a connection between two nodes in a workflow.
func (*Edge) SetSourceHandle ¶
SetSourceHandle sets the source handle (for condition nodes).
type EdgeRepository ¶
type EdgeRepository interface {
// Create creates a new edge.
Create(ctx context.Context, edge *Edge) error
// CreateBatch creates multiple edges.
CreateBatch(ctx context.Context, edges []*Edge) error
// GetByID retrieves an edge by ID.
GetByID(ctx context.Context, id shared.ID) (*Edge, error)
// GetByWorkflowID retrieves all edges for a workflow.
GetByWorkflowID(ctx context.Context, workflowID shared.ID) ([]*Edge, error)
// Delete deletes an edge.
Delete(ctx context.Context, id shared.ID) error
// DeleteByWorkflowID deletes all edges for a workflow.
DeleteByWorkflowID(ctx context.Context, workflowID shared.ID) error
}
EdgeRepository defines the interface for workflow edge persistence.
type Node ¶
type Node struct {
ID shared.ID
WorkflowID shared.ID
// Node definition
NodeKey string
NodeType NodeType
Name string
Description string
// Visual workflow builder
UIPosition UIPosition
// Configuration (depends on node type)
Config NodeConfig
// Timestamps
CreatedAt time.Time
}
Node represents a node in a workflow graph.
func NewNode ¶
func NewNode( workflowID shared.ID, nodeKey string, nodeType NodeType, name string, ) (*Node, error)
NewNode creates a new workflow node.
func (*Node) SetActionConfig ¶
func (n *Node) SetActionConfig(actionType ActionType, config map[string]any) error
SetActionConfig sets the action configuration.
func (*Node) SetConditionConfig ¶
SetConditionConfig sets the condition configuration.
func (*Node) SetDescription ¶
SetDescription sets the node description.
func (*Node) SetNotificationConfig ¶
func (n *Node) SetNotificationConfig(notifType NotificationType, config map[string]any) error
SetNotificationConfig sets the notification configuration.
func (*Node) SetTriggerConfig ¶
func (n *Node) SetTriggerConfig(triggerType TriggerType, config map[string]any) error
SetTriggerConfig sets the trigger configuration.
func (*Node) SetUIPosition ¶
SetUIPosition sets the visual position for the workflow builder.
type NodeConfig ¶
type NodeConfig struct {
// Trigger config
TriggerType TriggerType `json:"trigger_type,omitempty"`
TriggerConfig map[string]any `json:"trigger_config,omitempty"`
// Condition config
ConditionExpr string `json:"condition_expr,omitempty"`
// Action config
ActionType ActionType `json:"action_type,omitempty"`
ActionConfig map[string]any `json:"action_config,omitempty"`
// Notification config
NotificationType NotificationType `json:"notification_type,omitempty"`
NotificationConfig map[string]any `json:"notification_config,omitempty"`
}
NodeConfig contains the configuration for a workflow node. Different node types use different fields.
type NodeRepository ¶
type NodeRepository interface {
// Create creates a new node.
Create(ctx context.Context, node *Node) error
// CreateBatch creates multiple nodes.
CreateBatch(ctx context.Context, nodes []*Node) error
// GetByID retrieves a node by ID.
GetByID(ctx context.Context, id shared.ID) (*Node, error)
// GetByWorkflowID retrieves all nodes for a workflow.
GetByWorkflowID(ctx context.Context, workflowID shared.ID) ([]*Node, error)
// GetByKey retrieves a node by workflow ID and node key.
GetByKey(ctx context.Context, workflowID shared.ID, nodeKey string) (*Node, error)
// Update updates a node.
Update(ctx context.Context, node *Node) error
// Delete deletes a node.
Delete(ctx context.Context, id shared.ID) error
// DeleteByWorkflowID deletes all nodes for a workflow.
DeleteByWorkflowID(ctx context.Context, workflowID shared.ID) error
}
NodeRepository defines the interface for workflow node persistence.
type NodeRun ¶
type NodeRun struct {
ID shared.ID
WorkflowRunID shared.ID
NodeID shared.ID
NodeKey string
NodeType NodeType
// Status
Status NodeRunStatus
ErrorMessage string
ErrorCode string
// Input/Output
Input map[string]any
Output map[string]any
// For condition nodes, which branch was taken
ConditionResult *bool
// Timing
StartedAt *time.Time
CompletedAt *time.Time
// Audit
CreatedAt time.Time
}
NodeRun represents the execution of a single node in a workflow run.
func NewNodeRun ¶
func NewNodeRun( workflowRunID shared.ID, nodeID shared.ID, nodeKey string, nodeType NodeType, ) (*NodeRun, error)
NewNodeRun creates a new node run.
func (*NodeRun) SetConditionResult ¶
SetConditionResult sets the result of a condition evaluation.
type NodeRunFilter ¶
type NodeRunFilter struct {
WorkflowRunID *shared.ID
NodeID *shared.ID
Status *NodeRunStatus
}
NodeRunFilter represents filter options for listing node runs.
type NodeRunRepository ¶
type NodeRunRepository interface {
// Create creates a new node run.
Create(ctx context.Context, nodeRun *NodeRun) error
// CreateBatch creates multiple node runs.
CreateBatch(ctx context.Context, nodeRuns []*NodeRun) error
// GetByID retrieves a node run by ID.
GetByID(ctx context.Context, id shared.ID) (*NodeRun, error)
// GetByWorkflowRunID retrieves all node runs for a workflow run.
GetByWorkflowRunID(ctx context.Context, workflowRunID shared.ID) ([]*NodeRun, error)
// GetByNodeKey retrieves a node run by workflow run ID and node key.
GetByNodeKey(ctx context.Context, workflowRunID shared.ID, nodeKey string) (*NodeRun, error)
// List lists node runs with filters.
List(ctx context.Context, filter NodeRunFilter) ([]*NodeRun, error)
// Update updates a node run.
Update(ctx context.Context, nodeRun *NodeRun) error
// Delete deletes a node run.
Delete(ctx context.Context, id shared.ID) error
// UpdateStatus updates node run status.
UpdateStatus(ctx context.Context, id shared.ID, status NodeRunStatus, errorMessage, errorCode string) error
// Complete marks a node run as completed.
Complete(ctx context.Context, id shared.ID, output map[string]any) error
// GetPendingByDependencies gets node runs that are pending and have their dependencies completed.
GetPendingByDependencies(ctx context.Context, workflowRunID shared.ID, completedNodeKeys []string) ([]*NodeRun, error)
}
NodeRunRepository defines the interface for node run persistence.
type NodeRunStatus ¶
type NodeRunStatus string
NodeRunStatus represents the status of a node execution.
const ( NodeRunStatusPending NodeRunStatus = "pending" NodeRunStatusRunning NodeRunStatus = "running" NodeRunStatusCompleted NodeRunStatus = "completed" NodeRunStatusFailed NodeRunStatus = "failed" NodeRunStatusSkipped NodeRunStatus = "skipped" )
func (NodeRunStatus) IsTerminal ¶
func (s NodeRunStatus) IsTerminal() bool
IsTerminal checks if the status is a terminal state.
func (NodeRunStatus) IsValid ¶
func (s NodeRunStatus) IsValid() bool
IsValid checks if the node run status is valid.
type NotificationType ¶
type NotificationType string
NotificationType represents the type of notification.
const ( NotificationTypeSlack NotificationType = "slack" NotificationTypeEmail NotificationType = "email" NotificationTypeTeams NotificationType = "teams" NotificationTypeWebhook NotificationType = "webhook" NotificationTypePagerDuty NotificationType = "pagerduty" )
func (NotificationType) IsValid ¶
func (t NotificationType) IsValid() bool
IsValid checks if the notification type is valid.
type Run ¶
type Run struct {
ID shared.ID
WorkflowID shared.ID
TenantID shared.ID
// Trigger information
TriggerType TriggerType
TriggerData map[string]any
// Status
Status RunStatus
ErrorMessage string
// Context data passed through the workflow
Context map[string]any
// Statistics
TotalNodes int
CompletedNodes int
FailedNodes int
// Node runs (loaded separately)
NodeRuns []*NodeRun
// Timing
StartedAt *time.Time
CompletedAt *time.Time
// Audit
TriggeredBy *shared.ID
CreatedAt time.Time
}
Run represents an execution of a workflow.
func NewRun ¶
func NewRun( workflowID shared.ID, tenantID shared.ID, triggerType TriggerType, triggerData map[string]any, ) (*Run, error)
NewRun creates a new workflow run.
func (*Run) AddToContext ¶
AddToContext adds a value to the workflow context.
func (*Run) SetContext ¶
SetContext sets the workflow context data.
func (*Run) SetTriggeredBy ¶
SetTriggeredBy sets the user who triggered the run.
type RunFilter ¶
type RunFilter struct {
TenantID *shared.ID
WorkflowID *shared.ID
Status *RunStatus
TriggerType *TriggerType
TriggeredBy *shared.ID
StartedFrom *time.Time
StartedTo *time.Time
}
RunFilter represents filter options for listing runs.
type RunRepository ¶
type RunRepository interface {
// Create creates a new run.
Create(ctx context.Context, run *Run) error
// GetByID retrieves a run by ID.
GetByID(ctx context.Context, id shared.ID) (*Run, error)
// GetByTenantAndID retrieves a run by tenant and ID.
GetByTenantAndID(ctx context.Context, tenantID, id shared.ID) (*Run, error)
// List lists runs with filters and pagination.
List(ctx context.Context, filter RunFilter, page pagination.Pagination) (pagination.Result[*Run], error)
// ListByWorkflowID lists runs for a specific workflow with pagination.
ListByWorkflowID(ctx context.Context, workflowID shared.ID, page, perPage int) ([]*Run, int64, error)
// Update updates a run.
Update(ctx context.Context, run *Run) error
// Delete deletes a run.
Delete(ctx context.Context, id shared.ID) error
// GetWithNodeRuns retrieves a run with its node runs.
GetWithNodeRuns(ctx context.Context, id shared.ID) (*Run, error)
// GetActiveByWorkflowID retrieves active runs for a workflow.
GetActiveByWorkflowID(ctx context.Context, workflowID shared.ID) ([]*Run, error)
// CountActiveByWorkflowID counts active runs (pending/running) for a workflow.
CountActiveByWorkflowID(ctx context.Context, workflowID shared.ID) (int, error)
// CountActiveByTenantID counts active runs (pending/running) for a tenant.
CountActiveByTenantID(ctx context.Context, tenantID shared.ID) (int, error)
// UpdateStats updates run statistics.
UpdateStats(ctx context.Context, id shared.ID, completed, failed int) error
// UpdateStatus updates run status.
UpdateStatus(ctx context.Context, id shared.ID, status RunStatus, errorMessage string) error
// CreateRunIfUnderLimit atomically checks concurrent run limits and creates run if under limit.
// Uses a transaction with row-level locking to prevent race conditions.
// This prevents TOCTOU (time-of-check-time-of-use) vulnerabilities where multiple concurrent
// triggers could bypass the limits.
CreateRunIfUnderLimit(ctx context.Context, run *Run, maxPerWorkflow, maxPerTenant int) error
}
RunRepository defines the interface for workflow run persistence.
type RunStatus ¶
type RunStatus string
RunStatus represents the status of a workflow run.
func (RunStatus) IsTerminal ¶
IsTerminal checks if the status is a terminal state.
type TriggerType ¶
type TriggerType string
TriggerType represents the type of trigger.
const ( TriggerTypeManual TriggerType = "manual" TriggerTypeSchedule TriggerType = "schedule" TriggerTypeFindingCreated TriggerType = "finding_created" TriggerTypeFindingUpdated TriggerType = "finding_updated" TriggerTypeFindingAge TriggerType = "finding_age" TriggerTypeAssetDiscovered TriggerType = "asset_discovered" TriggerTypeScanCompleted TriggerType = "scan_completed" TriggerTypeWebhook TriggerType = "webhook" // AI Triage triggers TriggerTypeAITriageCompleted TriggerType = "ai_triage_completed" TriggerTypeAITriageFailed TriggerType = "ai_triage_failed" )
func (TriggerType) IsValid ¶
func (t TriggerType) IsValid() bool
IsValid checks if the trigger type is valid.
type UIPosition ¶
UIPosition represents the visual position in the workflow builder.
type Workflow ¶
type Workflow struct {
ID shared.ID
TenantID shared.ID
Name string
Description string
// Status
IsActive bool
// Metadata
Tags []string
// Nodes and edges (loaded separately)
Nodes []*Node
Edges []*Edge
// Execution statistics
TotalRuns int
SuccessfulRuns int
FailedRuns int
LastRunID *shared.ID
LastRunAt *time.Time
LastRunStatus string
// Audit
CreatedBy *shared.ID
CreatedAt time.Time
UpdatedAt time.Time
}
Workflow represents an automation workflow definition.
func NewWorkflow ¶
NewWorkflow creates a new workflow.
func (*Workflow) GetDownstreamNodes ¶
GetDownstreamNodes returns nodes that depend on the given node.
func (*Workflow) GetNodeByKey ¶
GetNodeByKey returns a node by its key.
func (*Workflow) GetTriggerNodes ¶
GetTriggerNodes returns all trigger nodes.
func (*Workflow) SetCreatedBy ¶
SetCreatedBy sets the user who created the workflow.
func (*Workflow) SuccessRate ¶
SuccessRate returns the success rate as a percentage.
func (*Workflow) ValidateGraph ¶
ValidateGraph validates the workflow graph structure.
type WorkflowFilter ¶
WorkflowFilter represents filter options for listing workflows.
type WorkflowRepository ¶
type WorkflowRepository interface {
// Create creates a new workflow.
Create(ctx context.Context, workflow *Workflow) error
// GetByID retrieves a workflow by ID.
GetByID(ctx context.Context, id shared.ID) (*Workflow, error)
// GetByTenantAndID retrieves a workflow by tenant and ID.
GetByTenantAndID(ctx context.Context, tenantID, id shared.ID) (*Workflow, error)
// GetByName retrieves a workflow by name.
GetByName(ctx context.Context, tenantID shared.ID, name string) (*Workflow, error)
// List lists workflows with filters and pagination.
List(ctx context.Context, filter WorkflowFilter, page pagination.Pagination) (pagination.Result[*Workflow], error)
// Update updates a workflow.
Update(ctx context.Context, workflow *Workflow) error
// Delete deletes a workflow.
Delete(ctx context.Context, id shared.ID) error
// GetWithGraph retrieves a workflow with its nodes and edges.
GetWithGraph(ctx context.Context, id shared.ID) (*Workflow, error)
// ListActiveWithTriggerType lists active workflows that have a trigger node
// with the specified trigger type. Returns workflows with their full graph
// (nodes and edges) in a single efficient query.
// This method is optimized to avoid N+1 query issues when processing events.
ListActiveWithTriggerType(ctx context.Context, tenantID shared.ID, triggerType TriggerType) ([]*Workflow, error)
}
WorkflowRepository defines the interface for workflow persistence.