dag_go

package module
v1.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 18, 2026 License: Apache-2.0 Imports: 11 Imported by: 0

README

dag-go

Tests Lint Go Reference

dag-go is a pure-Go concurrent DAG (Directed Acyclic Graph) execution engine. Wire up tasks as nodes, define dependencies as directed edges, and execute the entire graph concurrently — with context cancellation, per-node timeouts, cycle detection, and atomic state-transition guarantees.

한국어 문서: README.ko.md


Key Features

Feature Detail
Pure Go No Kubernetes or framework dependencies; stdlib + a handful of well-scoped modules
Context-aware execution Every Runnable.RunE receives context.Context; cancellation propagates through the whole graph
Deadlock-safe concurrency cap WorkerPoolSize bounds only inFlight (RunE); dependency wait (preFlight) never holds a slot, so any cap value is safe
Lifecycle guardrails FinishDag() seals the graph; AddEdge/SetContainerCmd/SetNodeRunner return errors after sealing
Atomic FinishDag Cycle detection runs before any structural mutation; a failed FinishDag leaves the DAG unchanged
Cycle detection DFS-based, returns ErrCycleDetected (sentinel, errors.Is-compatible)
Atomic state transitions TransitionStatus(from, to) CAS guards prevent illegal status overwrites
Per-node & DAG-level timeouts Node.Timeout or DagConfig.DefaultTimeout; timeout budget starts after acquiring the execution slot
Error policy FailFast (default) or ContinueOnError; runtime errors land in Dag.Errors channel
SafeChannel[T] Generic concurrency-safe channel wrapper that prevents double-close panics
Goroutine-leak tested Every test verifies zero goroutine leaks with goleak
Reset & retry Reset() restores the DAG to pre-GetReady state; reuse the same topology with fresh runners

Installation

go get github.com/HeaInSeo/dag-go

Requires Go 1.25+.


Quick Start

package main

import (
    "context"
    "errors"
    "fmt"
    "time"

    dag "github.com/HeaInSeo/dag-go"
)

type MyRunner struct{ label string }

func (r *MyRunner) RunE(ctx context.Context, _ interface{}) error {
    select {
    case <-time.After(50 * time.Millisecond):
        fmt.Printf("[%s] done\n", r.label)
        return nil
    case <-ctx.Done():
        return ctx.Err()
    }
}

func main() {
    // 1. Initialise — creates the synthetic start node.
    d, err := dag.InitDag()
    if err != nil {
        panic(err)
    }

    // 2. Set a default runner for all nodes.
    d.SetContainerCmd(&MyRunner{label: "default"})

    // 3. Wire a diamond graph:
    //    start → A → B1 ─┐
    //                B2 ─┴→ C → end
    _ = d.AddEdge(dag.StartNode, "A")
    _ = d.AddEdge("A", "B1")
    _ = d.AddEdge("A", "B2")
    _ = d.AddEdge("B1", "C")
    _ = d.AddEdge("B2", "C")

    // 4. Seal the graph. Cycle detection runs here.
    //    After this call, AddEdge/CreateNode are rejected.
    if err := d.FinishDag(); err != nil {
        if errors.Is(err, dag.ErrCycleDetected) {
            panic("cycle found in graph")
        }
        panic(err)
    }

    ctx := context.Background()

    // 5–7. Connect runners, prepare execution, fire.
    d.ConnectRunner()
    d.GetReady(ctx)
    d.Start()

    // 8. Block until all nodes complete.
    if ok := d.Wait(ctx); !ok {
        fmt.Println("DAG execution encountered an error")
        return
    }

    fmt.Printf("All done — progress: %.0f%%\n", d.Progress()*100)
}

DAG Lifecycle

InitDag() / StartDag()       creates the synthetic start node
        │
AddEdge(from, to)             wires parent → child dependencies
        │
FinishDag()                   validates, detects cycles, then seals the graph
        │                     ← AddEdge / CreateNode / SetContainerCmd rejected after this point
ConnectRunner()               attaches runner closures to every node
        │
GetReady(ctx)                 initialises the semaphore; launches one goroutine per node
        │
Start()                       sends the trigger signal to the start node
        │
Wait(ctx)                     fans-in all node status streams; returns true on success
        │
Reset()                       clears execution state; topology is preserved for re-use

Node State Machine

Every node follows a strict lifecycle enforced by TransitionStatus(from, to NodeStatus). Illegal transitions are atomically rejected.

stateDiagram-v2
    [*] --> Pending : node created
    Pending --> Running  : all parents ready
    Pending --> Skipped  : a parent failed (FailFast policy)
    Running --> Succeeded : RunE returns nil
    Running --> Failed    : RunE returns error / timeout
    Succeeded --> [*]
    Failed    --> [*]
    Skipped   --> [*]

Configuration (DagConfig)

cfg := dag.DagConfig{
    MinChannelBuffer:  5,               // inter-node edge channel buffer
    MaxChannelBuffer:  100,             // NodesResult / Errors channel buffer
    WorkerPoolSize:    50,              // max concurrent RunE executions
                                        // (safe to set below node count — see below)
    DefaultTimeout:    0,               // per-node RunE timeout; 0 = no limit
    ErrorDrainTimeout: 5 * time.Second, // max time collectErrors waits to drain
    ErrorPolicy:       dag.FailFast,    // or dag.ContinueOnError
}
d := dag.NewDagWithConfig(cfg)

// Functional-option variant:
d = dag.NewDagWithOptions(
    dag.WithTimeout(10 * time.Second),
    dag.WithWorkerPool(20),
)
WorkerPoolSize and deadlock safety

WorkerPoolSize caps the number of nodes that may execute RunE concurrently. Dependency waiting (preFlight) runs in a free goroutine without holding a slot, so the semaphore cannot deadlock the graph regardless of cap size.

WorkerPoolSize = 1, chain DAG (start → A → B → C → end):
  A waits in preFlight (no slot held)
  start runs, completes, releases nothing — A's preFlight unblocks
  A acquires slot → runs RunE → releases slot
  B acquires slot → … and so on

A timeout budget (Node.Timeout or DefaultTimeout) starts ticking after a node acquires its execution slot, so waiting for a slot never consumes the budget.


Lifecycle Guardrails

After FinishDag() succeeds, the graph and runner configuration are frozen:

d.FinishDag()                  // seals the graph
d.AddEdge("X", "Y")            // error: topology frozen after GetReady
d.SetContainerCmd(r)           // no-op + error log: frozen after GetReady
d.SetNodeRunner("n", r)        // no-op + error log: frozen after GetReady

The frozen window covers from GetReady success through Reset. A failed FinishDag() (e.g. cycle detected) leaves the DAG unchanged — you may correct the graph and retry.


Per-Node Runner Override

// Static override.
d.SetNodeRunner("heavy-node", &HeavyRunner{})

// Dynamic resolver: called at execution time per node.
d.SetRunnerResolver(func(n *dag.Node) dag.Runnable {
    if n.ID == "gpu-task" {
        return &GpuRunner{}
    }
    return nil // fall back to ContainerCmd
})

Runner priority (highest → lowest):

  1. Per-node override (SetNodeRunner)
  2. DAG-level resolver (SetRunnerResolver)
  3. Global default (SetContainerCmd)

Error Handling

FinishDag returns typed sentinel errors usable with errors.Is:

if err := d.FinishDag(); err != nil {
    if errors.Is(err, dag.ErrCycleDetected) {
        // graph contains a directed cycle
    }
}

Runtime errors from RunE are written to Dag.Errors (*SafeChannel[error]) and logged via structured logrus fields (dag_id, error). Use DroppedErrors() to detect back-pressure on the error channel.

Node-level errors are surfaced as *NodeError (supports both errors.Is and errors.As):

var nodeErr *dag.NodeError
if errors.As(err, &nodeErr) {
    fmt.Println(nodeErr.NodeID, nodeErr.Phase)
}

Reset & Retry

d.Wait(ctx)           // completes; DAG is frozen (running=false, nodeResult!=nil)
d.Reset()             // clears execution state, restores topology for reuse
d.ConnectRunner()     // re-attach runners (optional: set new runners before this)
d.GetReady(ctx)
d.Start()
d.Wait(ctx)

Mermaid Visualisation

dot := d.ToMermaid()  // call after FinishDag()
fmt.Println(dot)

Outputs a graph TD Mermaid flowchart with stadium shapes for synthetic nodes and per-node runner type labels when registered.


Development

# Tests with race detector
make test

# Lint (golangci-lint v2.11.3, local binary)
make lint

# Coverage report (HTML at reports/index.html)
make coverage

# Security scan (gosec + govulncheck, manual)
make lint-security
make vuln

Dependency policy: k8s.io/* and sigs.k8s.io/* are prohibited (depguard).

Pages:


License

See LICENSE for details.

Documentation

Index

Constants

View Source
const (
	Create createEdgeErrorType = iota
	Exist
	Fault
)

Create, Exist, Fault are the result codes returned by createEdge.

View Source
const (
	Start runningStatus = iota
	Preflight
	PreflightFailed
	InFlight
	InFlightFailed
	PostFlight
	PostFlightFailed
	FlightEnd
	Failed
	Succeed
)

The status displayed when running the runner on each node.

View Source
const (
	StartNode = "start_node"
	EndNode   = "end_node"
)

StartNode and EndNode are the reserved IDs for the synthetic entry and exit nodes that are automatically created by StartDag and FinishDag respectively.

Variables

View Source
var ErrCycleDetected = errors.New("cycle detected in DAG")

ErrCycleDetected is returned by FinishDag when the graph contains a directed cycle. Callers can test for this condition with errors.Is(err, ErrCycleDetected).

View Source
var ErrNoRunner = errors.New("no runner set for node")

ErrNoRunner is returned by execute when no Runnable has been configured for the node.

View Source
var Log = logrus.New()

Log is the package-level logger used throughout dag-go.

Functions

func DetectCycle

func DetectCycle(dag *Dag) bool

DetectCycle checks if the DAG contains a directed cycle. It is safe to call concurrently: it acquires a read lock before inspecting the graph.

Internal callers that already hold dag.mu (e.g. FinishDag) must call detectCycle directly to avoid a re-entrant lock attempt.

Types

type Dag

type Dag struct {
	// ID is the unique identifier assigned at creation time (UUID v4).
	ID string

	// NodesResult is the fan-in channel that collects printStatus events from
	// every node during execution.  Wait reads from this channel.
	NodesResult *SafeChannel[*printStatus]

	// Errors is the concurrency-safe error channel for runtime RunE failures.
	// Use reportError to write and collectErrors (or Errors.GetChannel()) to read.
	// The channel is recreated by Reset so it is valid for the next run.
	Errors *SafeChannel[error]

	// Timeout is the DAG-level execution deadline applied when bTimeout is true.
	// Set via WithTimeout or by assigning directly before GetReady.
	Timeout time.Duration

	// ContainerCmd is the global default Runnable applied to every node that
	// has no per-node override and no resolver match.
	// Set via SetContainerCmd to ensure thread-safe mutation.
	ContainerCmd Runnable

	// Config holds the tunable parameters active for this DAG instance.
	Config DagConfig
	// contains filtered or unexported fields
}

Dag is a Directed Acyclic Graph execution engine.

A Dag is created with InitDag (or NewDag / NewDagWithConfig), populated with AddEdge calls, sealed with FinishDag, and then executed via the lifecycle:

ConnectRunner → GetReady → Start → Wait

A completed DAG can be re-executed by calling Reset followed by the same lifecycle. Dag must always be handled as a pointer; value-copy is forbidden because it embeds sync.RWMutex.

func CopyDag

func CopyDag(original *Dag, newID string) *Dag

CopyDag creates a fully executable copy of original with a new ID.

The copy preserves the graph topology (nodes, edges, parent/child relationships) and all DagConfig values. Fresh execution channels (NodesResult, Errors, edge safeVertices, start-node trigger) are created so running the copy cannot affect the original.

Items NOT copied: per-node runners (Node.runnerVal), execSem, nodeResult, startTrigger, errLogs, and runnerResolver. startTrigger is intentionally omitted — it is nil until GetReadyE validates and captures it; the copy must go through GetReady independently. Per-node runners are excluded because they are stateful closures bound to the original DAG's channels. The caller must wire runners and follow the standard lifecycle on the copy:

ConnectRunner → GetReady → Start → Wait

Returns nil if original is nil or newID is empty.

func InitDag

func InitDag() (*Dag, error)

InitDag creates a new DAG with default configuration and immediately calls StartDag to create the synthetic start node. It is the recommended entry point for most users.

func InitDagWithOptions

func InitDagWithOptions(options ...DagOption) (*Dag, error)

InitDagWithOptions creates and initialises a new DAG, applying the supplied functional options before adding the synthetic start node. It is the option-friendly equivalent of InitDag.

func NewDag

func NewDag() *Dag

NewDag returns a new Dag with default configuration (see DefaultDagConfig). Call StartDag (or use InitDag) to add the synthetic start node before adding edges.

func NewDagWithConfig

func NewDagWithConfig(config DagConfig) *Dag

NewDagWithConfig returns a new Dag using the supplied DagConfig. Prefer InitDag for the common case; use this constructor when you need to customise buffer sizes, timeouts, or the worker pool size before adding nodes. Invalid config values (zero or negative buffers / pool size) are clamped to safe minimums by normalizeDagConfig.

func NewDagWithOptions

func NewDagWithOptions(options ...DagOption) *Dag

NewDagWithOptions returns a new Dag with DefaultDagConfig, then applies each DagOption in order. Functional options (e.g. WithTimeout, WithWorkerPool) are applied after default values, so later options can override earlier ones. The final config is normalised after all options are applied.

func (*Dag) AddEdge

func (dag *Dag) AddEdge(from, to string) error

AddEdge adds an edge between two nodes with improved error handling. Returns an error if the DAG has already been finalized by FinishDag.

func (*Dag) AddEdgeIfNodesExist

func (dag *Dag) AddEdgeIfNodesExist(from, to string) error

AddEdgeIfNodesExist adds an edge only if both nodes already exist.

func (*Dag) ConnectRunner

func (dag *Dag) ConnectRunner() bool

ConnectRunner is the bool-returning variant of ConnectRunnerE. It returns false if the DAG has no nodes.

func (*Dag) ConnectRunnerE

func (dag *Dag) ConnectRunnerE() error

ConnectRunnerE attaches the three-phase runner closure (preFlight / inFlight / postFlight) to every node. Call it after FinishDag and before GetReady; call it again after Reset so the closures reference the freshly created channels. Returns an error if the DAG has no nodes or if GetReady has already been called (runner closures must not be replaced while goroutines are live).

func (*Dag) CreateNode

func (dag *Dag) CreateNode(id string) *Node

CreateNode creates a pointer to a new node with thread safety. Returns nil if the DAG has already been finalized by FinishDag, or if id is a reserved synthetic node name (StartNode / EndNode).

func (*Dag) CreateNodeWithTimeOut

func (dag *Dag) CreateNodeWithTimeOut(id string, bTimeOut bool, ti time.Duration) *Node

CreateNodeWithTimeOut creates a node that applies a per-node timeout when bTimeOut is true. Returns nil if the DAG has already been finalized by FinishDag, or if id is a reserved synthetic node name (StartNode / EndNode).

func (*Dag) DroppedErrors

func (dag *Dag) DroppedErrors() int64

DroppedErrors returns the number of errors that reportError could not deliver to the Errors channel (channel full or closed) since the DAG was created or last Reset. A non-zero value indicates that DagConfig.MaxChannelBuffer is too small or that the Errors channel consumer is not draining fast enough.

func (*Dag) Edges

func (dag *Dag) Edges() []*Edge

Edges returns a shallow copy of the DAG's edge list. The slice is safe to read but mutations to the returned slice do not affect the DAG.

func (*Dag) EndNodeID

func (dag *Dag) EndNodeID() string

EndNodeID returns the ID of the synthetic exit node ("end_node"), or an empty string if FinishDag has not been called yet.

func (*Dag) ErrCount

func (dag *Dag) ErrCount() int

ErrCount returns the number of errors currently buffered in the Errors channel. The count is a point-in-time snapshot and may change immediately after the call. Use DroppedErrors to check for errors that overflowed the buffer capacity.

func (*Dag) FinishDag

func (dag *Dag) FinishDag() error

FinishDag finalizes the DAG by connecting end nodes and validating the structure.

func (*Dag) GetReady

func (dag *Dag) GetReady(ctx context.Context) bool

GetReady is the bool-returning variant of GetReadyE.

func (*Dag) GetReadyE

func (dag *Dag) GetReadyE(ctx context.Context) error

GetReadyE initialises the execution semaphore and launches one goroutine per node. Each goroutine runs preFlight (dependency wait) without holding an execution slot; the slot is acquired inside connectRunner just before inFlight (RunE) so that a small WorkerPoolSize cannot deadlock the graph.

Prerequisites (returns a descriptive error if violated):

  • ctx must be non-nil
  • FinishDag must have been called (dag.validated == true)
  • ConnectRunner must have been called (every node.runner must be non-nil)
  • GetReady / GetReadyE must not have been called already

ctx is forwarded to each node's runner; cancel it to abort the entire execution.

func (*Dag) Progress

func (dag *Dag) Progress() float64

Progress returns the DAG execution completion ratio in [0.0, 1.0].

NOTE: nodeCount and completedCount are read in two separate atomic operations; they do not form an atomic pair. completedCount may be incremented between the two reads, making the returned ratio momentarily slightly ahead of reality. This is acceptable for progress-bar or observability purposes, but must NOT be used for correctness decisions (e.g. deciding whether all nodes finished).

func (*Dag) Reset

func (dag *Dag) Reset()

Reset reinitialises a completed DAG so it can be executed again without rebuilding the graph from scratch.

Reset MUST be called only after Wait returns. Calling it while the DAG is still running is a no-op (the call is logged and returns immediately) to prevent live goroutines from racing against freshly created channels. Use ResetE if you need an error return instead of a silent guard.

After Reset, follow the standard execution lifecycle:

dag.Reset()
dag.ConnectRunner()
dag.GetReady(ctx)
dag.Start()
dag.Wait(ctx)

The graph topology (nodes, edges, Config, ContainerCmd, runners) is preserved; only execution state is reset.

func (*Dag) ResetE

func (dag *Dag) ResetE() error

ResetE is the error-returning variant of Reset. It returns an error when called while the DAG is still running (i.e. Wait has not yet returned).

func (*Dag) SetContainerCmd

func (dag *Dag) SetContainerCmd(r Runnable)

SetContainerCmd sets the global default Runnable for all nodes in this DAG. It is safe to call concurrently. Per-node overrides (SetNodeRunner) and the RunnerResolver take priority over this value; see runner priority in the README.

Mutation policy: SetContainerCmd must be called before GetReady and must not be called again until Reset. The DAG is considered frozen from the moment GetReadyE succeeds until reset() clears nodeResult — this covers both the running window (goroutines live) and the post-Wait / pre-Reset window. Mutating the global runner inside the frozen window would cause different nodes to execute with different runners depending on scheduling order, breaking reproducibility.

func (*Dag) SetNodeRunner

func (dag *Dag) SetNodeRunner(id string, r Runnable) bool

SetNodeRunner sets the runner for the node with the given id. Returns false if the node does not exist, is not in Pending status, or if the DAG is frozen (GetReady has been called and Reset has not yet been called).

Mutation policy: same as SetContainerCmd — must be called before GetReady and not again until Reset. The frozen window spans from GetReadyE success through Reset.

func (*Dag) SetNodeRunners

func (dag *Dag) SetNodeRunners(m map[string]Runnable) (applied int, missing, skipped []string)

SetNodeRunners bulk-sets runners from a map of node-id to Runnable. Returns the count of applied runners, and slices of missing/skipped node ids.

Mutation policy: same as SetContainerCmd — must be called before GetReady and not again until Reset. When the DAG is frozen all ids are returned in the skipped slice with applied == 0.

func (*Dag) SetRunnerResolver

func (dag *Dag) SetRunnerResolver(rr RunnerResolver)

SetRunnerResolver installs a dynamic runner selector for this DAG. The resolver is called at execution time for each node, after the per-node atomic override is checked but before the global ContainerCmd fallback. Pass nil to clear a previously installed resolver. Thread-safe.

Mutation policy: same as SetContainerCmd — must be called before GetReady and not again until Reset. The frozen window spans from GetReadyE success through Reset; changing the resolver inside it would break reproducibility.

func (*Dag) Start

func (dag *Dag) Start() bool

Start is the bool-returning variant of StartE.

func (*Dag) StartDag

func (dag *Dag) StartDag() (*Dag, error)

StartDag creates the synthetic start node and its trigger channel. It is called automatically by InitDag; call it directly only when building a DAG with NewDag or NewDagWithConfig.

Returns the receiver so calls can be chained, or an error if the start node could not be created (e.g. the DAG is nil or the node was already created).

func (*Dag) StartE

func (dag *Dag) StartE() error

StartE fires the DAG by sending a trigger signal to the start-node trigger channel captured by GetReadyE. All node goroutines are already waiting; this single send unblocks the start node and cascades through the graph.

Call StartE exactly once after GetReady. Returns an error if GetReady was not called, if Start was already called for this run, or if the trigger send fails unexpectedly.

StartE never reads startNode.parentVertex directly — it uses the dag.startTrigger channel that GetReadyE captured and validated before any goroutines were launched. This guarantees no concurrent access to the parentVertex slice while goroutines are live.

func (*Dag) StartNodeID

func (dag *Dag) StartNodeID() string

StartNodeID returns the ID of the synthetic entry node ("start_node"), or an empty string if StartDag has not been called yet.

func (*Dag) ToMermaid

func (dag *Dag) ToMermaid() string

ToMermaid generates a Mermaid flowchart string that represents the DAG topology.

The output uses the "graph TD" (top-down) direction. Synthetic nodes (start_node / end_node) are rendered with a stadium shape to distinguish infrastructure nodes from user-defined ones. If a per-node Runnable has been registered via SetNodeRunner, its concrete Go type is appended to the node label after a line-break so the diagram shows which executor is bound to each step — useful for debugging or documentation.

Node IDs are sanitised for Mermaid syntax by replacing any character that is not an ASCII letter, digit, or underscore with an underscore (see mermaidSafeID). This prevents parser errors for IDs that contain hyphens, dots, or spaces.

Example output:

graph TD
    start_node(["start_node"])
    A["A\n*main.MyRunner"]
    B["B"]
    end_node(["end_node"])
    start_node --> A
    A --> B
    B --> end_node

ToMermaid acquires a read-lock and is safe to call concurrently with Progress() and other read-only observers. It must be called after FinishDag so that dag.edges is complete; calling it before FinishDag will produce a diagram that is missing the edges to the synthetic end node.

func (*Dag) Wait

func (dag *Dag) Wait(ctx context.Context) bool

Wait blocks until the DAG finishes execution, ctx expires, or a fatal node failure is detected on NodesResult.

It returns true only when the end node emits a FlightEnd status — meaning every node in the graph reached a terminal state (Succeeded or Skipped). It returns false on any of:

  • context cancellation or timeout
  • NodesResult channel closed unexpectedly
  • end node reporting a PreflightFailed / InFlightFailed / PostFlightFailed

Wait closes all channels (closeChannels) and shuts down the worker pool when it returns, regardless of whether execution succeeded. Do NOT use the DAG after Wait returns without first calling Reset.

func (*Dag) WaitE

func (dag *Dag) WaitE(ctx context.Context) error

WaitE is the error-returning variant of Wait. It returns nil when the DAG completes successfully (FlightEnd from end_node), or a descriptive error on context cancellation, timeout, or node failure.

type DagConfig

type DagConfig struct {
	// MinChannelBuffer is the buffer size for inter-node edge channels.  Larger
	// values reduce the chance of a parent blocking while writing to a slow child.
	// Default: 5.
	MinChannelBuffer int

	// MaxChannelBuffer is the buffer capacity for the NodesResult and Errors
	// aggregation channels.  Set this higher than the total number of nodes to
	// prevent back-pressure stalls.  Default: 100.
	MaxChannelBuffer int

	// StatusBuffer is reserved for future per-node status channel buffering.
	// Default: 10.
	StatusBuffer int

	// WorkerPoolSize caps the number of goroutines that execute nodes concurrently.
	// If the number of nodes is smaller than WorkerPoolSize, the pool is sized to
	// the node count instead (min of the two).  Default: 50.
	WorkerPoolSize int

	// DefaultTimeout is the implicit per-node execution timeout applied during
	// inFlight (RunE). It limits how long each node's user-supplied work may run.
	//
	// Zero (the default) means no implicit per-node execution timeout is applied;
	// only the caller's context deadline governs execution length.
	// Per-node overrides are set via Node.Timeout (when Timeout > 0).
	//
	// Dependency wait (preFlight) is never bounded by this value — it always
	// uses the caller's context only, so upstream work never causes false-negative
	// timeouts in downstream nodes.
	DefaultTimeout time.Duration

	// ErrorDrainTimeout is the maximum time collectErrors will wait to drain the
	// Errors channel.  Defaults to 5 s when left at zero (see DefaultDagConfig).
	ErrorDrainTimeout time.Duration

	// ExpectedNodeCount is a capacity hint for the internal nodes map and the
	// Edges slice.  When the final node count is known upfront, setting this
	// field avoids incremental map rehashing and slice growth during AddEdge
	// calls.  Zero means let the runtime decide the initial capacity.
	ExpectedNodeCount int

	// ErrorPolicy controls how downstream nodes react to upstream failures.
	// ErrorPolicyFailFast (default, zero value) skips children of a failed node.
	// ErrorPolicyContinueOnError allows children to run regardless of parent outcome;
	// errors are still recorded in the Errors channel and DroppedErrors counter.
	ErrorPolicy ErrorPolicy

	// EnablePprofLabels attaches runtime/pprof goroutine labels to each preFlight
	// worker goroutine ("phase", "nodeId", "channelIndex").
	// Only effective when the binary is built with -tags pprof.
	// Default: false — zero allocation overhead in production builds.
	EnablePprofLabels bool
}

DagConfig holds tunable parameters for a Dag instance. Use DefaultDagConfig for production-ready defaults, or override individual fields before passing to NewDagWithConfig.

func DefaultDagConfig

func DefaultDagConfig() DagConfig

DefaultDagConfig returns a DagConfig populated with production-ready defaults:

  • MinChannelBuffer: 5
  • MaxChannelBuffer: 100
  • StatusBuffer: 10
  • WorkerPoolSize: 50
  • DefaultTimeout: 0 (no implicit per-node execution timeout)
  • ErrorDrainTimeout: 5 s

DefaultTimeout is intentionally 0: individual node execution time is not bounded by default. Use WithDefaultTimeout or set Node.Timeout explicitly when per-node execution limits are required. DAG-wide time limits should be enforced via the caller context passed to GetReady and Wait.

type DagOption

type DagOption func(*Dag)

DagOption is a functional-option type for NewDagWithOptions. Use the provided With* constructors to build option values.

func WithChannelBuffers

func WithChannelBuffers(minBuffer, maxBuffer, statusBuffer int) DagOption

WithChannelBuffers sets the channel buffer sizes used for edge signalling and result aggregation. Larger buffers reduce back-pressure in high-fan-out DAGs.

func WithDefaultTimeout

func WithDefaultTimeout(d time.Duration) DagOption

WithDefaultTimeout sets the implicit per-node execution timeout applied during inFlight (RunE). This is distinct from WithTimeout, which caps the overall DAG run in Wait().

Semantics:

  • d == 0: no implicit per-node execution timeout (the default). Each node's RunE runs until the caller context expires or the node returns.
  • d > 0: RunE is bounded by d. If RunE does not return within d, its context is cancelled and the node is marked failed.

Dependency wait (preFlight) is never bounded by this value; it always honours the caller context only. This prevents long upstream execution chains from causing false-negative timeouts in downstream nodes.

Per-node overrides: set Node.Timeout > 0 on individual nodes; that takes priority over this DAG-wide default.

func WithErrorPolicy

func WithErrorPolicy(p ErrorPolicy) DagOption

WithErrorPolicy sets the error propagation policy for this DAG. ErrorPolicyFailFast (default) skips downstream nodes when a parent fails. ErrorPolicyContinueOnError allows all nodes to run regardless of parent failures; errors are still reported via the Errors channel.

func WithTimeout

func WithTimeout(timeout time.Duration) DagOption

WithTimeout sets the DAG-level execution deadline used by Wait. This is distinct from WithDefaultTimeout, which bounds individual node execution (inFlight). Pass 0 or omit to rely on the caller context only.

func WithWorkerPool

func WithWorkerPool(size int) DagOption

WithWorkerPool sets the maximum number of concurrent node-execution goroutines. If the DAG has fewer nodes than size, the pool is sized to the node count instead.

type DagWorkerPool

type DagWorkerPool struct {
	// contains filtered or unexported fields
}

DagWorkerPool manages a bounded pool of goroutines for concurrent node execution. Tasks are submitted via Submit; Close drains the queue and waits for all workers.

func NewDagWorkerPool

func NewDagWorkerPool(limit int) *DagWorkerPool

NewDagWorkerPool creates a new worker pool with the given number of goroutines. The internal task queue is buffered to twice the worker count so that callers are not serialised behind goroutine startup latency.

func (*DagWorkerPool) Close

func (p *DagWorkerPool) Close()

Close 워커 풀을 종료. sync.Once 를 통해 taskQueue 를 한 번만 닫으므로 이중 호출 시 패닉이 발생하지 않는다.

func (*DagWorkerPool) Submit

func (p *DagWorkerPool) Submit(task nodeTask)

Submit enqueues a nodeTask for execution by the worker pool.

type Edge

type Edge struct {
	// contains filtered or unexported fields
}

Edge represents a directed connection between two nodes. The embedded safeVertex channel carries runningStatus signals from the parent node to its child during execution. Edges are created by AddEdge and reset by Reset; callers should not manipulate the fields directly.

type ErrorPolicy

type ErrorPolicy int

ErrorPolicy controls how downstream nodes react to upstream failures.

const (
	// ErrorPolicyFailFast causes downstream nodes to be skipped when any parent
	// node fails. This is the default and preserves the invariant that no node
	// runs after a dependency failure.
	ErrorPolicyFailFast ErrorPolicy = iota

	// ErrorPolicyContinueOnError allows downstream nodes to execute even when a
	// parent has failed. Nodes proceed through all three flight phases regardless
	// of parent outcome; per-node errors are still collected via Errors channel.
	ErrorPolicyContinueOnError
)

type ErrorType

type ErrorType int

ErrorType identifies the DAG operation that produced a systemError.

const (
	AddEdge ErrorType = iota
	StartDag
	AddEdgeIfNodesExist

	FinishDag
)

AddEdge, StartDag, AddEdgeIfNodesExist, addEndNode, FinishDag are the ErrorType values that identify which DAG operation recorded an error.

type Node

type Node struct {
	ID        string
	ImageName string

	Commands string

	// Timeout is the per-node inFlight execution budget.
	// When Timeout > 0, it overrides Dag.Config.DefaultTimeout for this node.
	// Zero means "use DefaultTimeout or, if that is also zero, the caller context".
	// Set this field directly before ConnectRunner is called.
	Timeout time.Duration
	// contains filtered or unexported fields
}

Node is the fundamental building block of a DAG. A Node must always be handled as a pointer; copying a Node is forbidden because atomic.Value must not be copied after first use.

func (*Node) CheckParentsStatus

func (n *Node) CheckParentsStatus() bool

CheckParentsStatus returns false (and transitions this node to Skipped) if any parent has already failed. The Pending→Skipped transition is guarded by TransitionStatus so that a node in an unexpected state is never silently overwritten.

func (*Node) Execute

func (n *Node) Execute(ctx context.Context) error

Execute runs the node's resolved Runnable, forwarding ctx for cancellation.

func (*Node) GetStatus

func (n *Node) GetStatus() NodeStatus

GetStatus returns the node's current status under the read lock.

func (*Node) IsSucceed

func (n *Node) IsSucceed() bool

IsSucceed returns the succeed flag under the read lock.

func (*Node) MarkCompleted

func (n *Node) MarkCompleted()

MarkCompleted increments the parent DAG's completed-node counter.

func (*Node) SetRunner

func (n *Node) SetRunner(r Runnable) bool

SetRunner atomically sets the runner for a Pending node. Returns false if the node is not in Pending status.

func (*Node) SetStatus

func (n *Node) SetStatus(status NodeStatus)

SetStatus sets the node's status under the write lock. Prefer TransitionStatus when a pre-condition on the current status is required.

func (*Node) SetSucceed

func (n *Node) SetSucceed(val bool)

SetSucceed sets the succeed flag under the write lock.

func (*Node) TransitionStatus

func (n *Node) TransitionStatus(from, to NodeStatus) bool

TransitionStatus atomically advances n's status from `from` to `to`. It returns true only when both conditions hold:

  1. n.status == from at the moment the lock is acquired, AND
  2. the from→to transition is permitted by the state machine.

This prevents illegal backwards moves such as Failed→Succeeded. Use SetStatus only when an unconditional override is explicitly required.

type NodeError

type NodeError struct {
	NodeID string
	Phase  string
	Err    error
}

NodeError carries structured information about a node-level execution failure.

func (*NodeError) Error

func (e *NodeError) Error() string

func (*NodeError) Unwrap

func (e *NodeError) Unwrap() error

type NodeStatus

type NodeStatus int

NodeStatus represents the lifecycle state of a Node.

const (
	NodeStatusPending NodeStatus = iota
	NodeStatusRunning
	NodeStatusSucceeded
	NodeStatusFailed
	NodeStatusSkipped // set when a parent has failed
)

NodeStatusPending through NodeStatusSkipped represent the lifecycle states of a Node.

type Runnable

type Runnable interface {
	RunE(ctx context.Context, a interface{}) error
}

Runnable defines the interface for executable units attached to a DAG node. RunE is invoked during the inFlight phase. The supplied ctx carries the cancellation / timeout signal from the enclosing DAG execution; implementations should honour ctx.Done() so that long-running work can be interrupted cleanly. The second parameter is the *Node that is currently executing, passed as an opaque interface{} to avoid an import cycle for callers outside this package.

type RunnerResolver

type RunnerResolver func(*Node) Runnable

RunnerResolver is an optional hook that picks a Runnable dynamically based on node metadata (e.g. image name, labels). It is consulted after the per-node atomic runner but before Dag.ContainerCmd.

type SafeChannel

type SafeChannel[T any] struct {
	// contains filtered or unexported fields
}

SafeChannel is a generic, concurrency-safe channel wrapper that prevents double-close panics and provides non-blocking send semantics.

func NewSafeChannelGen

func NewSafeChannelGen[T any](buffer int) *SafeChannel[T]

NewSafeChannelGen creates a new SafeChannel with the given buffer size.

func (*SafeChannel[T]) Close

func (sc *SafeChannel[T]) Close() (err error)

Close closes the underlying channel exactly once. Returns an error if the channel is already closed or a panic occurs during close.

Close first signals sc.done (via doneOnce) to unblock any SendBlocking callers that are blocked waiting for buffer space, then acquires the write lock to set sc.closed and close the underlying channel. This ordering prevents a deadlock where Close() cannot acquire the write lock because SendBlocking holds the read lock indefinitely on a full channel.

func (*SafeChannel[T]) GetChannel

func (sc *SafeChannel[T]) GetChannel() chan T

GetChannel returns the underlying channel for range/select operations.

func (*SafeChannel[T]) Send

func (sc *SafeChannel[T]) Send(value T) bool

Send attempts to deliver value to the channel. Returns false if the channel is already closed, if Close() has started (sc.done is closed), or if the buffer is full.

The sc.done check closes the narrow race window between Close() signalling done (before acquiring the write lock) and setting sc.closed = true.

func (*SafeChannel[T]) SendBlocking

func (sc *SafeChannel[T]) SendBlocking(ctx context.Context, value T) bool

SendBlocking blocks until value is delivered to the channel, the context is cancelled, or the channel is closed. Returns true when the value was sent; false when the channel was already closed, ctx.Done fired, or a concurrent Close() began.

Unlike Send, SendBlocking never silently drops a value when the buffer is full — it waits for a consumer to make room. Use this for signals where loss would leave a downstream goroutine waiting forever (e.g. edge vertex channels between nodes).

The read lock is held for the duration of the blocking select so that a concurrent Close cannot race with the send. The sc.done case unblocks the select when Close() is called concurrently and the buffer is full — this prevents a deadlock where Close() cannot acquire the write lock because SendBlocking holds the read lock indefinitely.

Directories

Path Synopsis
examples
build_system command
Package main demonstrates a multi-stage software build pipeline using dag-go.
Package main demonstrates a multi-stage software build pipeline using dag-go.
etl_pipeline command
Package main demonstrates a realistic ETL (Extract-Transform-Load) pipeline built with dag-go.
Package main demonstrates a realistic ETL (Extract-Transform-Load) pipeline built with dag-go.
reset_retry command
Package main demonstrates dag-go's Reset() method for implementing a retry strategy without rebuilding the graph.
Package main demonstrates dag-go's Reset() method for implementing a retry strategy without rebuilding the graph.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL