workflow

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 12, 2026 License: Apache-2.0 Imports: 4 Imported by: 0

Documentation

Overview

Package workflow defines the Workflow domain entities for automation orchestration. Unlike Pipelines (scan execution), Workflows handle general automation: notifications, ticket creation, assignments, escalations, and triggering pipelines.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ActionType

type ActionType string

ActionType represents the type of action.

const (
	ActionTypeAssignUser      ActionType = "assign_user"
	ActionTypeAssignTeam      ActionType = "assign_team"
	ActionTypeUpdatePriority  ActionType = "update_priority"
	ActionTypeUpdateStatus    ActionType = "update_status"
	ActionTypeAddTags         ActionType = "add_tags"
	ActionTypeRemoveTags      ActionType = "remove_tags"
	ActionTypeCreateTicket    ActionType = "create_ticket"
	ActionTypeUpdateTicket    ActionType = "update_ticket"
	ActionTypeTriggerPipeline ActionType = "trigger_pipeline"
	ActionTypeTriggerScan     ActionType = "trigger_scan"
	ActionTypeHTTPRequest     ActionType = "http_request"
	ActionTypeRunScript       ActionType = "run_script"
	// AI Triage action
	ActionTypeTriggerAITriage ActionType = "trigger_ai_triage"
)

func (ActionType) IsValid

func (t ActionType) IsValid() bool

IsValid checks if the action type is valid.

type Edge

type Edge struct {
	ID         shared.ID
	WorkflowID shared.ID

	// Connection definition
	SourceNodeKey string
	TargetNodeKey string

	// For condition nodes, specify which output handle
	// "yes" or "no" for condition nodes, empty for other types
	SourceHandle string

	// Optional label for display
	Label string

	// Timestamps
	CreatedAt time.Time
}

Edge represents a connection between two nodes in a workflow.

func NewEdge

func NewEdge(
	workflowID shared.ID,
	sourceNodeKey string,
	targetNodeKey string,
) (*Edge, error)

NewEdge creates a new workflow edge.

func (*Edge) Clone

func (e *Edge) Clone() *Edge

Clone creates a copy of the edge with a new ID.

func (*Edge) SetLabel

func (e *Edge) SetLabel(label string)

SetLabel sets the display label.

func (*Edge) SetSourceHandle

func (e *Edge) SetSourceHandle(handle string)

SetSourceHandle sets the source handle (for condition nodes).

type EdgeRepository

type EdgeRepository interface {
	// Create creates a new edge.
	Create(ctx context.Context, edge *Edge) error

	// CreateBatch creates multiple edges.
	CreateBatch(ctx context.Context, edges []*Edge) error

	// GetByID retrieves an edge by ID.
	GetByID(ctx context.Context, id shared.ID) (*Edge, error)

	// GetByWorkflowID retrieves all edges for a workflow.
	GetByWorkflowID(ctx context.Context, workflowID shared.ID) ([]*Edge, error)

	// Delete deletes an edge.
	Delete(ctx context.Context, id shared.ID) error

	// DeleteByWorkflowID deletes all edges for a workflow.
	DeleteByWorkflowID(ctx context.Context, workflowID shared.ID) error
}

EdgeRepository defines the interface for workflow edge persistence.

type Node

type Node struct {
	ID         shared.ID
	WorkflowID shared.ID

	// Node definition
	NodeKey     string
	NodeType    NodeType
	Name        string
	Description string

	// Visual workflow builder
	UIPosition UIPosition

	// Configuration (depends on node type)
	Config NodeConfig

	// Timestamps
	CreatedAt time.Time
}

Node represents a node in a workflow graph.

func NewNode

func NewNode(
	workflowID shared.ID,
	nodeKey string,
	nodeType NodeType,
	name string,
) (*Node, error)

NewNode creates a new workflow node.

func (*Node) Clone

func (n *Node) Clone() *Node

Clone creates a copy of the node with a new ID.

func (*Node) SetActionConfig

func (n *Node) SetActionConfig(actionType ActionType, config map[string]any) error

SetActionConfig sets the action configuration.

func (*Node) SetConditionConfig

func (n *Node) SetConditionConfig(expr string) error

SetConditionConfig sets the condition configuration.

func (*Node) SetDescription

func (n *Node) SetDescription(desc string)

SetDescription sets the node description.

func (*Node) SetNotificationConfig

func (n *Node) SetNotificationConfig(notifType NotificationType, config map[string]any) error

SetNotificationConfig sets the notification configuration.

func (*Node) SetTriggerConfig

func (n *Node) SetTriggerConfig(triggerType TriggerType, config map[string]any) error

SetTriggerConfig sets the trigger configuration.

func (*Node) SetUIPosition

func (n *Node) SetUIPosition(x, y float64)

SetUIPosition sets the visual position for the workflow builder.

type NodeConfig

type NodeConfig struct {
	// Trigger config
	TriggerType   TriggerType    `json:"trigger_type,omitempty"`
	TriggerConfig map[string]any `json:"trigger_config,omitempty"`

	// Condition config
	ConditionExpr string `json:"condition_expr,omitempty"`

	// Action config
	ActionType   ActionType     `json:"action_type,omitempty"`
	ActionConfig map[string]any `json:"action_config,omitempty"`

	// Notification config
	NotificationType   NotificationType `json:"notification_type,omitempty"`
	NotificationConfig map[string]any   `json:"notification_config,omitempty"`
}

NodeConfig contains the configuration for a workflow node. Different node types use different fields.

type NodeRepository

type NodeRepository interface {
	// Create creates a new node.
	Create(ctx context.Context, node *Node) error

	// CreateBatch creates multiple nodes.
	CreateBatch(ctx context.Context, nodes []*Node) error

	// GetByID retrieves a node by ID.
	GetByID(ctx context.Context, id shared.ID) (*Node, error)

	// GetByWorkflowID retrieves all nodes for a workflow.
	GetByWorkflowID(ctx context.Context, workflowID shared.ID) ([]*Node, error)

	// GetByKey retrieves a node by workflow ID and node key.
	GetByKey(ctx context.Context, workflowID shared.ID, nodeKey string) (*Node, error)

	// Update updates a node.
	Update(ctx context.Context, node *Node) error

	// Delete deletes a node.
	Delete(ctx context.Context, id shared.ID) error

	// DeleteByWorkflowID deletes all nodes for a workflow.
	DeleteByWorkflowID(ctx context.Context, workflowID shared.ID) error
}

NodeRepository defines the interface for workflow node persistence.

type NodeRun

type NodeRun struct {
	ID            shared.ID
	WorkflowRunID shared.ID
	NodeID        shared.ID
	NodeKey       string
	NodeType      NodeType

	// Status
	Status       NodeRunStatus
	ErrorMessage string
	ErrorCode    string

	// Input/Output
	Input  map[string]any
	Output map[string]any

	// For condition nodes, which branch was taken
	ConditionResult *bool

	// Timing
	StartedAt   *time.Time
	CompletedAt *time.Time

	// Audit
	CreatedAt time.Time
}

NodeRun represents the execution of a single node in a workflow run.

func NewNodeRun

func NewNodeRun(
	workflowRunID shared.ID,
	nodeID shared.ID,
	nodeKey string,
	nodeType NodeType,
) (*NodeRun, error)

NewNodeRun creates a new node run.

func (*NodeRun) Complete

func (nr *NodeRun) Complete(output map[string]any)

Complete marks the node run as completed.

func (*NodeRun) Duration

func (nr *NodeRun) Duration() time.Duration

Duration returns the node run duration.

func (*NodeRun) Fail

func (nr *NodeRun) Fail(errorMessage, errorCode string)

Fail marks the node run as failed.

func (*NodeRun) SetConditionResult

func (nr *NodeRun) SetConditionResult(result bool)

SetConditionResult sets the result of a condition evaluation.

func (*NodeRun) SetInput

func (nr *NodeRun) SetInput(input map[string]any)

SetInput sets the node input data.

func (*NodeRun) Skip

func (nr *NodeRun) Skip(reason string)

Skip marks the node run as skipped.

func (*NodeRun) Start

func (nr *NodeRun) Start()

Start marks the node run as started.

type NodeRunFilter

type NodeRunFilter struct {
	WorkflowRunID *shared.ID
	NodeID        *shared.ID
	Status        *NodeRunStatus
}

NodeRunFilter represents filter options for listing node runs.

type NodeRunRepository

type NodeRunRepository interface {
	// Create creates a new node run.
	Create(ctx context.Context, nodeRun *NodeRun) error

	// CreateBatch creates multiple node runs.
	CreateBatch(ctx context.Context, nodeRuns []*NodeRun) error

	// GetByID retrieves a node run by ID.
	GetByID(ctx context.Context, id shared.ID) (*NodeRun, error)

	// GetByWorkflowRunID retrieves all node runs for a workflow run.
	GetByWorkflowRunID(ctx context.Context, workflowRunID shared.ID) ([]*NodeRun, error)

	// GetByNodeKey retrieves a node run by workflow run ID and node key.
	GetByNodeKey(ctx context.Context, workflowRunID shared.ID, nodeKey string) (*NodeRun, error)

	// List lists node runs with filters.
	List(ctx context.Context, filter NodeRunFilter) ([]*NodeRun, error)

	// Update updates a node run.
	Update(ctx context.Context, nodeRun *NodeRun) error

	// Delete deletes a node run.
	Delete(ctx context.Context, id shared.ID) error

	// UpdateStatus updates node run status.
	UpdateStatus(ctx context.Context, id shared.ID, status NodeRunStatus, errorMessage, errorCode string) error

	// Complete marks a node run as completed.
	Complete(ctx context.Context, id shared.ID, output map[string]any) error

	// GetPendingByDependencies gets node runs that are pending and have their dependencies completed.
	GetPendingByDependencies(ctx context.Context, workflowRunID shared.ID, completedNodeKeys []string) ([]*NodeRun, error)
}

NodeRunRepository defines the interface for node run persistence.

type NodeRunStatus

type NodeRunStatus string

NodeRunStatus represents the status of a node execution.

const (
	NodeRunStatusPending   NodeRunStatus = "pending"
	NodeRunStatusRunning   NodeRunStatus = "running"
	NodeRunStatusCompleted NodeRunStatus = "completed"
	NodeRunStatusFailed    NodeRunStatus = "failed"
	NodeRunStatusSkipped   NodeRunStatus = "skipped"
)

func (NodeRunStatus) IsTerminal

func (s NodeRunStatus) IsTerminal() bool

IsTerminal checks if the status is a terminal state.

func (NodeRunStatus) IsValid

func (s NodeRunStatus) IsValid() bool

IsValid checks if the node run status is valid.

type NodeType

type NodeType string

NodeType represents the type of workflow node.

const (
	NodeTypeTrigger      NodeType = "trigger"
	NodeTypeCondition    NodeType = "condition"
	NodeTypeAction       NodeType = "action"
	NodeTypeNotification NodeType = "notification"
)

func (NodeType) IsValid

func (t NodeType) IsValid() bool

IsValid checks if the node type is valid.

type NotificationType

type NotificationType string

NotificationType represents the type of notification.

const (
	NotificationTypeSlack     NotificationType = "slack"
	NotificationTypeEmail     NotificationType = "email"
	NotificationTypeTeams     NotificationType = "teams"
	NotificationTypeWebhook   NotificationType = "webhook"
	NotificationTypePagerDuty NotificationType = "pagerduty"
)

func (NotificationType) IsValid

func (t NotificationType) IsValid() bool

IsValid checks if the notification type is valid.

type Run

type Run struct {
	ID         shared.ID
	WorkflowID shared.ID
	TenantID   shared.ID

	// Trigger information
	TriggerType TriggerType
	TriggerData map[string]any

	// Status
	Status       RunStatus
	ErrorMessage string

	// Context data passed through the workflow
	Context map[string]any

	// Statistics
	TotalNodes     int
	CompletedNodes int
	FailedNodes    int

	// Node runs (loaded separately)
	NodeRuns []*NodeRun

	// Timing
	StartedAt   *time.Time
	CompletedAt *time.Time

	// Audit
	TriggeredBy *shared.ID
	CreatedAt   time.Time
}

Run represents an execution of a workflow.

func NewRun

func NewRun(
	workflowID shared.ID,
	tenantID shared.ID,
	triggerType TriggerType,
	triggerData map[string]any,
) (*Run, error)

NewRun creates a new workflow run.

func (*Run) AddToContext

func (r *Run) AddToContext(key string, value any)

AddToContext adds a value to the workflow context.

func (*Run) Cancel

func (r *Run) Cancel()

Cancel marks the run as cancelled.

func (*Run) Complete

func (r *Run) Complete()

Complete marks the run as completed.

func (*Run) Duration

func (r *Run) Duration() time.Duration

Duration returns the run duration.

func (*Run) Fail

func (r *Run) Fail(errorMessage string)

Fail marks the run as failed.

func (*Run) SetContext

func (r *Run) SetContext(ctx map[string]any)

SetContext sets the workflow context data.

func (*Run) SetStats

func (r *Run) SetStats(total, completed, failed int)

SetStats sets the run statistics.

func (*Run) SetTriggeredBy

func (r *Run) SetTriggeredBy(userID shared.ID)

SetTriggeredBy sets the user who triggered the run.

func (*Run) Start

func (r *Run) Start()

Start marks the run as started.

type RunFilter

type RunFilter struct {
	TenantID    *shared.ID
	WorkflowID  *shared.ID
	Status      *RunStatus
	TriggerType *TriggerType
	TriggeredBy *shared.ID
	StartedFrom *time.Time
	StartedTo   *time.Time
}

RunFilter represents filter options for listing runs.

type RunRepository

type RunRepository interface {
	// Create creates a new run.
	Create(ctx context.Context, run *Run) error

	// GetByID retrieves a run by ID.
	GetByID(ctx context.Context, id shared.ID) (*Run, error)

	// GetByTenantAndID retrieves a run by tenant and ID.
	GetByTenantAndID(ctx context.Context, tenantID, id shared.ID) (*Run, error)

	// List lists runs with filters and pagination.
	List(ctx context.Context, filter RunFilter, page pagination.Pagination) (pagination.Result[*Run], error)

	// ListByWorkflowID lists runs for a specific workflow with pagination.
	ListByWorkflowID(ctx context.Context, workflowID shared.ID, page, perPage int) ([]*Run, int64, error)

	// Update updates a run.
	Update(ctx context.Context, run *Run) error

	// Delete deletes a run.
	Delete(ctx context.Context, id shared.ID) error

	// GetWithNodeRuns retrieves a run with its node runs.
	GetWithNodeRuns(ctx context.Context, id shared.ID) (*Run, error)

	// GetActiveByWorkflowID retrieves active runs for a workflow.
	GetActiveByWorkflowID(ctx context.Context, workflowID shared.ID) ([]*Run, error)

	// CountActiveByWorkflowID counts active runs (pending/running) for a workflow.
	CountActiveByWorkflowID(ctx context.Context, workflowID shared.ID) (int, error)

	// CountActiveByTenantID counts active runs (pending/running) for a tenant.
	CountActiveByTenantID(ctx context.Context, tenantID shared.ID) (int, error)

	// UpdateStats updates run statistics.
	UpdateStats(ctx context.Context, id shared.ID, completed, failed int) error

	// UpdateStatus updates run status.
	UpdateStatus(ctx context.Context, id shared.ID, status RunStatus, errorMessage string) error

	// CreateRunIfUnderLimit atomically checks concurrent run limits and creates run if under limit.
	// Uses a transaction with row-level locking to prevent race conditions.
	// This prevents TOCTOU (time-of-check-time-of-use) vulnerabilities where multiple concurrent
	// triggers could bypass the limits.
	CreateRunIfUnderLimit(ctx context.Context, run *Run, maxPerWorkflow, maxPerTenant int) error
}

RunRepository defines the interface for workflow run persistence.

type RunStatus

type RunStatus string

RunStatus represents the status of a workflow run.

const (
	RunStatusPending   RunStatus = "pending"
	RunStatusRunning   RunStatus = "running"
	RunStatusCompleted RunStatus = "completed"
	RunStatusFailed    RunStatus = "failed"
	RunStatusCancelled RunStatus = "cancelled"
)

func (RunStatus) IsTerminal

func (s RunStatus) IsTerminal() bool

IsTerminal checks if the status is a terminal state.

func (RunStatus) IsValid

func (s RunStatus) IsValid() bool

IsValid checks if the run status is valid.

type TriggerType

type TriggerType string

TriggerType represents the type of trigger.

const (
	TriggerTypeManual          TriggerType = "manual"
	TriggerTypeSchedule        TriggerType = "schedule"
	TriggerTypeFindingCreated  TriggerType = "finding_created"
	TriggerTypeFindingUpdated  TriggerType = "finding_updated"
	TriggerTypeFindingAge      TriggerType = "finding_age"
	TriggerTypeAssetDiscovered TriggerType = "asset_discovered"
	TriggerTypeScanCompleted   TriggerType = "scan_completed"
	TriggerTypeWebhook         TriggerType = "webhook"
	// AI Triage triggers
	TriggerTypeAITriageCompleted TriggerType = "ai_triage_completed"
	TriggerTypeAITriageFailed    TriggerType = "ai_triage_failed"
)

func (TriggerType) IsValid

func (t TriggerType) IsValid() bool

IsValid checks if the trigger type is valid.

type UIPosition

type UIPosition struct {
	X float64 `json:"x"`
	Y float64 `json:"y"`
}

UIPosition represents the visual position in the workflow builder.

type Workflow

type Workflow struct {
	ID          shared.ID
	TenantID    shared.ID
	Name        string
	Description string

	// Status
	IsActive bool

	// Metadata
	Tags []string

	// Nodes and edges (loaded separately)
	Nodes []*Node
	Edges []*Edge

	// Execution statistics
	TotalRuns      int
	SuccessfulRuns int
	FailedRuns     int
	LastRunID      *shared.ID
	LastRunAt      *time.Time
	LastRunStatus  string

	// Audit
	CreatedBy *shared.ID
	CreatedAt time.Time
	UpdatedAt time.Time
}

Workflow represents an automation workflow definition.

func NewWorkflow

func NewWorkflow(tenantID shared.ID, name, description string) (*Workflow, error)

NewWorkflow creates a new workflow.

func (*Workflow) Activate

func (w *Workflow) Activate()

Activate activates the workflow.

func (*Workflow) AddEdge

func (w *Workflow) AddEdge(edge *Edge)

AddEdge adds an edge to the workflow.

func (*Workflow) AddNode

func (w *Workflow) AddNode(node *Node)

AddNode adds a node to the workflow.

func (*Workflow) Clone

func (w *Workflow) Clone(newName string) *Workflow

Clone creates a copy of the workflow with a new ID.

func (*Workflow) Deactivate

func (w *Workflow) Deactivate()

Deactivate deactivates the workflow.

func (*Workflow) GetDownstreamNodes

func (w *Workflow) GetDownstreamNodes(nodeKey string) []*Node

GetDownstreamNodes returns nodes that depend on the given node.

func (*Workflow) GetNodeByKey

func (w *Workflow) GetNodeByKey(key string) *Node

GetNodeByKey returns a node by its key.

func (*Workflow) GetTriggerNodes

func (w *Workflow) GetTriggerNodes() []*Node

GetTriggerNodes returns all trigger nodes.

func (*Workflow) RecordRun

func (w *Workflow) RecordRun(runID shared.ID, status string)

RecordRun records the result of a workflow run.

func (*Workflow) SetCreatedBy

func (w *Workflow) SetCreatedBy(userID shared.ID)

SetCreatedBy sets the user who created the workflow.

func (*Workflow) SuccessRate

func (w *Workflow) SuccessRate() int

SuccessRate returns the success rate as a percentage.

func (*Workflow) ValidateGraph

func (w *Workflow) ValidateGraph() error

ValidateGraph validates the workflow graph structure.

type WorkflowFilter

type WorkflowFilter struct {
	TenantID *shared.ID
	IsActive *bool
	Tags     []string
	Search   string
}

WorkflowFilter represents filter options for listing workflows.

type WorkflowRepository

type WorkflowRepository interface {
	// Create creates a new workflow.
	Create(ctx context.Context, workflow *Workflow) error

	// GetByID retrieves a workflow by ID.
	GetByID(ctx context.Context, id shared.ID) (*Workflow, error)

	// GetByTenantAndID retrieves a workflow by tenant and ID.
	GetByTenantAndID(ctx context.Context, tenantID, id shared.ID) (*Workflow, error)

	// GetByName retrieves a workflow by name.
	GetByName(ctx context.Context, tenantID shared.ID, name string) (*Workflow, error)

	// List lists workflows with filters and pagination.
	List(ctx context.Context, filter WorkflowFilter, page pagination.Pagination) (pagination.Result[*Workflow], error)

	// Update updates a workflow.
	Update(ctx context.Context, workflow *Workflow) error

	// Delete deletes a workflow.
	Delete(ctx context.Context, id shared.ID) error

	// GetWithGraph retrieves a workflow with its nodes and edges.
	GetWithGraph(ctx context.Context, id shared.ID) (*Workflow, error)

	// ListActiveWithTriggerType lists active workflows that have a trigger node
	// with the specified trigger type. Returns workflows with their full graph
	// (nodes and edges) in a single efficient query.
	// This method is optimized to avoid N+1 query issues when processing events.
	ListActiveWithTriggerType(ctx context.Context, tenantID shared.ID, triggerType TriggerType) ([]*Workflow, error)
}

WorkflowRepository defines the interface for workflow persistence.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL