Documentation
¶
Index ¶
- Constants
- Variables
- func DetectCycle(dag *Dag) bool
- type Dag
- func (dag *Dag) AddEdge(from, to string) error
- func (dag *Dag) AddEdgeIfNodesExist(from, to string) error
- func (dag *Dag) ConnectRunner() bool
- func (dag *Dag) ConnectRunnerE() error
- func (dag *Dag) CreateNode(id string) *Node
- func (dag *Dag) CreateNodeWithTimeOut(id string, bTimeOut bool, ti time.Duration) *Node
- func (dag *Dag) DroppedErrors() int64
- func (dag *Dag) Edges() []*Edge
- func (dag *Dag) EndNodeID() string
- func (dag *Dag) ErrCount() int
- func (dag *Dag) FinishDag() error
- func (dag *Dag) GetReady(ctx context.Context) bool
- func (dag *Dag) GetReadyE(ctx context.Context) error
- func (dag *Dag) Progress() float64
- func (dag *Dag) Reset()
- func (dag *Dag) ResetE() error
- func (dag *Dag) SetContainerCmd(r Runnable)
- func (dag *Dag) SetNodeRunner(id string, r Runnable) bool
- func (dag *Dag) SetNodeRunners(m map[string]Runnable) (applied int, missing, skipped []string)
- func (dag *Dag) SetRunnerResolver(rr RunnerResolver)
- func (dag *Dag) Start() bool
- func (dag *Dag) StartDag() (*Dag, error)
- func (dag *Dag) StartE() error
- func (dag *Dag) StartNodeID() string
- func (dag *Dag) ToMermaid() string
- func (dag *Dag) Wait(ctx context.Context) bool
- func (dag *Dag) WaitE(ctx context.Context) error
- type DagConfig
- type DagOption
- type DagWorkerPool
- type Edge
- type ErrorPolicy
- type ErrorType
- type Node
- func (n *Node) CheckParentsStatus() bool
- func (n *Node) Execute(ctx context.Context) error
- func (n *Node) GetStatus() NodeStatus
- func (n *Node) IsSucceed() bool
- func (n *Node) MarkCompleted()
- func (n *Node) SetRunner(r Runnable) bool
- func (n *Node) SetStatus(status NodeStatus)
- func (n *Node) SetSucceed(val bool)
- func (n *Node) TransitionStatus(from, to NodeStatus) bool
- type NodeError
- type NodeStatus
- type Runnable
- type RunnerResolver
- type SafeChannel
Constants ¶
const ( Create createEdgeErrorType = iota Exist Fault )
Create, Exist, Fault are the result codes returned by createEdge.
const ( Start runningStatus = iota Preflight PreflightFailed InFlight InFlightFailed PostFlight PostFlightFailed FlightEnd Failed Succeed )
The status displayed when running the runner on each node.
const ( StartNode = "start_node" EndNode = "end_node" )
StartNode and EndNode are the reserved IDs for the synthetic entry and exit nodes that are automatically created by StartDag and FinishDag respectively.
Variables ¶
var ErrCycleDetected = errors.New("cycle detected in DAG")
ErrCycleDetected is returned by FinishDag when the graph contains a directed cycle. Callers can test for this condition with errors.Is(err, ErrCycleDetected).
var ErrNoRunner = errors.New("no runner set for node")
ErrNoRunner is returned by execute when no Runnable has been configured for the node.
var Log = logrus.New()
Log is the package-level logger used throughout dag-go.
Functions ¶
func DetectCycle ¶
DetectCycle checks if the DAG contains a directed cycle. It is safe to call concurrently: it acquires a read lock before inspecting the graph.
Internal callers that already hold dag.mu (e.g. FinishDag) must call detectCycle directly to avoid a re-entrant lock attempt.
Types ¶
type Dag ¶
type Dag struct {
// ID is the unique identifier assigned at creation time (UUID v4).
ID string
// NodesResult is the fan-in channel that collects printStatus events from
// every node during execution. Wait reads from this channel.
NodesResult *SafeChannel[*printStatus]
// Errors is the concurrency-safe error channel for runtime RunE failures.
// Use reportError to write and collectErrors (or Errors.GetChannel()) to read.
// The channel is recreated by Reset so it is valid for the next run.
Errors *SafeChannel[error]
// Timeout is the DAG-level execution deadline applied when bTimeout is true.
// Set via WithTimeout or by assigning directly before GetReady.
Timeout time.Duration
// ContainerCmd is the global default Runnable applied to every node that
// has no per-node override and no resolver match.
// Set via SetContainerCmd to ensure thread-safe mutation.
ContainerCmd Runnable
// Config holds the tunable parameters active for this DAG instance.
Config DagConfig
// contains filtered or unexported fields
}
Dag is a Directed Acyclic Graph execution engine.
A Dag is created with InitDag (or NewDag / NewDagWithConfig), populated with AddEdge calls, sealed with FinishDag, and then executed via the lifecycle:
ConnectRunner → GetReady → Start → Wait
A completed DAG can be re-executed by calling Reset followed by the same lifecycle. Dag must always be handled as a pointer; value-copy is forbidden because it embeds sync.RWMutex.
func CopyDag ¶
CopyDag creates a fully executable copy of original with a new ID.
The copy preserves the graph topology (nodes, edges, parent/child relationships) and all DagConfig values. Fresh execution channels (NodesResult, Errors, edge safeVertices, start-node trigger) are created so running the copy cannot affect the original.
Items NOT copied: per-node runners (Node.runnerVal), execSem, nodeResult, startTrigger, errLogs, and runnerResolver. startTrigger is intentionally omitted — it is nil until GetReadyE validates and captures it; the copy must go through GetReady independently. Per-node runners are excluded because they are stateful closures bound to the original DAG's channels. The caller must wire runners and follow the standard lifecycle on the copy:
ConnectRunner → GetReady → Start → Wait
Returns nil if original is nil or newID is empty.
func InitDag ¶
InitDag creates a new DAG with default configuration and immediately calls StartDag to create the synthetic start node. It is the recommended entry point for most users.
func InitDagWithOptions ¶
InitDagWithOptions creates and initialises a new DAG, applying the supplied functional options before adding the synthetic start node. It is the option-friendly equivalent of InitDag.
func NewDag ¶
func NewDag() *Dag
NewDag returns a new Dag with default configuration (see DefaultDagConfig). Call StartDag (or use InitDag) to add the synthetic start node before adding edges.
func NewDagWithConfig ¶
NewDagWithConfig returns a new Dag using the supplied DagConfig. Prefer InitDag for the common case; use this constructor when you need to customise buffer sizes, timeouts, or the worker pool size before adding nodes. Invalid config values (zero or negative buffers / pool size) are clamped to safe minimums by normalizeDagConfig.
func NewDagWithOptions ¶
NewDagWithOptions returns a new Dag with DefaultDagConfig, then applies each DagOption in order. Functional options (e.g. WithTimeout, WithWorkerPool) are applied after default values, so later options can override earlier ones. The final config is normalised after all options are applied.
func (*Dag) AddEdge ¶
AddEdge adds an edge between two nodes with improved error handling. Returns an error if the DAG has already been finalized by FinishDag.
func (*Dag) AddEdgeIfNodesExist ¶
AddEdgeIfNodesExist adds an edge only if both nodes already exist.
func (*Dag) ConnectRunner ¶
ConnectRunner is the bool-returning variant of ConnectRunnerE. It returns false if the DAG has no nodes.
func (*Dag) ConnectRunnerE ¶
ConnectRunnerE attaches the three-phase runner closure (preFlight / inFlight / postFlight) to every node. Call it after FinishDag and before GetReady; call it again after Reset so the closures reference the freshly created channels. Returns an error if the DAG has no nodes or if GetReady has already been called (runner closures must not be replaced while goroutines are live).
func (*Dag) CreateNode ¶
CreateNode creates a pointer to a new node with thread safety. Returns nil if the DAG has already been finalized by FinishDag, or if id is a reserved synthetic node name (StartNode / EndNode).
func (*Dag) CreateNodeWithTimeOut ¶
CreateNodeWithTimeOut creates a node that applies a per-node timeout when bTimeOut is true. Returns nil if the DAG has already been finalized by FinishDag, or if id is a reserved synthetic node name (StartNode / EndNode).
func (*Dag) DroppedErrors ¶
DroppedErrors returns the number of errors that reportError could not deliver to the Errors channel (channel full or closed) since the DAG was created or last Reset. A non-zero value indicates that DagConfig.MaxChannelBuffer is too small or that the Errors channel consumer is not draining fast enough.
func (*Dag) Edges ¶
Edges returns a shallow copy of the DAG's edge list. The slice is safe to read but mutations to the returned slice do not affect the DAG.
func (*Dag) EndNodeID ¶
EndNodeID returns the ID of the synthetic exit node ("end_node"), or an empty string if FinishDag has not been called yet.
func (*Dag) ErrCount ¶
ErrCount returns the number of errors currently buffered in the Errors channel. The count is a point-in-time snapshot and may change immediately after the call. Use DroppedErrors to check for errors that overflowed the buffer capacity.
func (*Dag) FinishDag ¶
FinishDag finalizes the DAG by connecting end nodes and validating the structure.
func (*Dag) GetReadyE ¶
GetReadyE initialises the execution semaphore and launches one goroutine per node. Each goroutine runs preFlight (dependency wait) without holding an execution slot; the slot is acquired inside connectRunner just before inFlight (RunE) so that a small WorkerPoolSize cannot deadlock the graph.
Prerequisites (returns a descriptive error if violated):
- ctx must be non-nil
- FinishDag must have been called (dag.validated == true)
- ConnectRunner must have been called (every node.runner must be non-nil)
- GetReady / GetReadyE must not have been called already
ctx is forwarded to each node's runner; cancel it to abort the entire execution.
func (*Dag) Progress ¶
Progress returns the DAG execution completion ratio in [0.0, 1.0].
NOTE: nodeCount and completedCount are read in two separate atomic operations; they do not form an atomic pair. completedCount may be incremented between the two reads, making the returned ratio momentarily slightly ahead of reality. This is acceptable for progress-bar or observability purposes, but must NOT be used for correctness decisions (e.g. deciding whether all nodes finished).
func (*Dag) Reset ¶
func (dag *Dag) Reset()
Reset reinitialises a completed DAG so it can be executed again without rebuilding the graph from scratch.
Reset MUST be called only after Wait returns. Calling it while the DAG is still running is a no-op (the call is logged and returns immediately) to prevent live goroutines from racing against freshly created channels. Use ResetE if you need an error return instead of a silent guard.
After Reset, follow the standard execution lifecycle:
dag.Reset() dag.ConnectRunner() dag.GetReady(ctx) dag.Start() dag.Wait(ctx)
The graph topology (nodes, edges, Config, ContainerCmd, runners) is preserved; only execution state is reset.
func (*Dag) ResetE ¶
ResetE is the error-returning variant of Reset. It returns an error when called while the DAG is still running (i.e. Wait has not yet returned).
func (*Dag) SetContainerCmd ¶
SetContainerCmd sets the global default Runnable for all nodes in this DAG. It is safe to call concurrently. Per-node overrides (SetNodeRunner) and the RunnerResolver take priority over this value; see runner priority in the README.
Mutation policy: SetContainerCmd must be called before GetReady and must not be called again until Reset. The DAG is considered frozen from the moment GetReadyE succeeds until reset() clears nodeResult — this covers both the running window (goroutines live) and the post-Wait / pre-Reset window. Mutating the global runner inside the frozen window would cause different nodes to execute with different runners depending on scheduling order, breaking reproducibility.
func (*Dag) SetNodeRunner ¶
SetNodeRunner sets the runner for the node with the given id. Returns false if the node does not exist, is not in Pending status, or if the DAG is frozen (GetReady has been called and Reset has not yet been called).
Mutation policy: same as SetContainerCmd — must be called before GetReady and not again until Reset. The frozen window spans from GetReadyE success through Reset.
func (*Dag) SetNodeRunners ¶
SetNodeRunners bulk-sets runners from a map of node-id to Runnable. Returns the count of applied runners, and slices of missing/skipped node ids.
Mutation policy: same as SetContainerCmd — must be called before GetReady and not again until Reset. When the DAG is frozen all ids are returned in the skipped slice with applied == 0.
func (*Dag) SetRunnerResolver ¶
func (dag *Dag) SetRunnerResolver(rr RunnerResolver)
SetRunnerResolver installs a dynamic runner selector for this DAG. The resolver is called at execution time for each node, after the per-node atomic override is checked but before the global ContainerCmd fallback. Pass nil to clear a previously installed resolver. Thread-safe.
Mutation policy: same as SetContainerCmd — must be called before GetReady and not again until Reset. The frozen window spans from GetReadyE success through Reset; changing the resolver inside it would break reproducibility.
func (*Dag) StartDag ¶
StartDag creates the synthetic start node and its trigger channel. It is called automatically by InitDag; call it directly only when building a DAG with NewDag or NewDagWithConfig.
Returns the receiver so calls can be chained, or an error if the start node could not be created (e.g. the DAG is nil or the node was already created).
func (*Dag) StartE ¶
StartE fires the DAG by sending a trigger signal to the start-node trigger channel captured by GetReadyE. All node goroutines are already waiting; this single send unblocks the start node and cascades through the graph.
Call StartE exactly once after GetReady. Returns an error if GetReady was not called, if Start was already called for this run, or if the trigger send fails unexpectedly.
StartE never reads startNode.parentVertex directly — it uses the dag.startTrigger channel that GetReadyE captured and validated before any goroutines were launched. This guarantees no concurrent access to the parentVertex slice while goroutines are live.
func (*Dag) StartNodeID ¶
StartNodeID returns the ID of the synthetic entry node ("start_node"), or an empty string if StartDag has not been called yet.
func (*Dag) ToMermaid ¶
ToMermaid generates a Mermaid flowchart string that represents the DAG topology.
The output uses the "graph TD" (top-down) direction. Synthetic nodes (start_node / end_node) are rendered with a stadium shape to distinguish infrastructure nodes from user-defined ones. If a per-node Runnable has been registered via SetNodeRunner, its concrete Go type is appended to the node label after a line-break so the diagram shows which executor is bound to each step — useful for debugging or documentation.
Node IDs are sanitised for Mermaid syntax by replacing any character that is not an ASCII letter, digit, or underscore with an underscore (see mermaidSafeID). This prevents parser errors for IDs that contain hyphens, dots, or spaces.
Example output:
graph TD
start_node(["start_node"])
A["A\n*main.MyRunner"]
B["B"]
end_node(["end_node"])
start_node --> A
A --> B
B --> end_node
ToMermaid acquires a read-lock and is safe to call concurrently with Progress() and other read-only observers. It must be called after FinishDag so that dag.edges is complete; calling it before FinishDag will produce a diagram that is missing the edges to the synthetic end node.
func (*Dag) Wait ¶
Wait blocks until the DAG finishes execution, ctx expires, or a fatal node failure is detected on NodesResult.
It returns true only when the end node emits a FlightEnd status — meaning every node in the graph reached a terminal state (Succeeded or Skipped). It returns false on any of:
- context cancellation or timeout
- NodesResult channel closed unexpectedly
- end node reporting a PreflightFailed / InFlightFailed / PostFlightFailed
Wait closes all channels (closeChannels) and shuts down the worker pool when it returns, regardless of whether execution succeeded. Do NOT use the DAG after Wait returns without first calling Reset.
type DagConfig ¶
type DagConfig struct {
// MinChannelBuffer is the buffer size for inter-node edge channels. Larger
// values reduce the chance of a parent blocking while writing to a slow child.
// Default: 5.
MinChannelBuffer int
// MaxChannelBuffer is the buffer capacity for the NodesResult and Errors
// aggregation channels. Set this higher than the total number of nodes to
// prevent back-pressure stalls. Default: 100.
MaxChannelBuffer int
// StatusBuffer is reserved for future per-node status channel buffering.
// Default: 10.
StatusBuffer int
// WorkerPoolSize caps the number of goroutines that execute nodes concurrently.
// If the number of nodes is smaller than WorkerPoolSize, the pool is sized to
// the node count instead (min of the two). Default: 50.
WorkerPoolSize int
// DefaultTimeout is the implicit per-node execution timeout applied during
// inFlight (RunE). It limits how long each node's user-supplied work may run.
//
// Zero (the default) means no implicit per-node execution timeout is applied;
// only the caller's context deadline governs execution length.
// Per-node overrides are set via Node.Timeout (when Timeout > 0).
//
// Dependency wait (preFlight) is never bounded by this value — it always
// uses the caller's context only, so upstream work never causes false-negative
// timeouts in downstream nodes.
DefaultTimeout time.Duration
// ErrorDrainTimeout is the maximum time collectErrors will wait to drain the
// Errors channel. Defaults to 5 s when left at zero (see DefaultDagConfig).
ErrorDrainTimeout time.Duration
// ExpectedNodeCount is a capacity hint for the internal nodes map and the
// Edges slice. When the final node count is known upfront, setting this
// field avoids incremental map rehashing and slice growth during AddEdge
// calls. Zero means let the runtime decide the initial capacity.
ExpectedNodeCount int
// ErrorPolicy controls how downstream nodes react to upstream failures.
// ErrorPolicyFailFast (default, zero value) skips children of a failed node.
// ErrorPolicyContinueOnError allows children to run regardless of parent outcome;
// errors are still recorded in the Errors channel and DroppedErrors counter.
ErrorPolicy ErrorPolicy
// EnablePprofLabels attaches runtime/pprof goroutine labels to each preFlight
// worker goroutine ("phase", "nodeId", "channelIndex").
// Only effective when the binary is built with -tags pprof.
// Default: false — zero allocation overhead in production builds.
EnablePprofLabels bool
}
DagConfig holds tunable parameters for a Dag instance. Use DefaultDagConfig for production-ready defaults, or override individual fields before passing to NewDagWithConfig.
func DefaultDagConfig ¶
func DefaultDagConfig() DagConfig
DefaultDagConfig returns a DagConfig populated with production-ready defaults:
- MinChannelBuffer: 5
- MaxChannelBuffer: 100
- StatusBuffer: 10
- WorkerPoolSize: 50
- DefaultTimeout: 0 (no implicit per-node execution timeout)
- ErrorDrainTimeout: 5 s
DefaultTimeout is intentionally 0: individual node execution time is not bounded by default. Use WithDefaultTimeout or set Node.Timeout explicitly when per-node execution limits are required. DAG-wide time limits should be enforced via the caller context passed to GetReady and Wait.
type DagOption ¶
type DagOption func(*Dag)
DagOption is a functional-option type for NewDagWithOptions. Use the provided With* constructors to build option values.
func WithChannelBuffers ¶
WithChannelBuffers sets the channel buffer sizes used for edge signalling and result aggregation. Larger buffers reduce back-pressure in high-fan-out DAGs.
func WithDefaultTimeout ¶
WithDefaultTimeout sets the implicit per-node execution timeout applied during inFlight (RunE). This is distinct from WithTimeout, which caps the overall DAG run in Wait().
Semantics:
- d == 0: no implicit per-node execution timeout (the default). Each node's RunE runs until the caller context expires or the node returns.
- d > 0: RunE is bounded by d. If RunE does not return within d, its context is cancelled and the node is marked failed.
Dependency wait (preFlight) is never bounded by this value; it always honours the caller context only. This prevents long upstream execution chains from causing false-negative timeouts in downstream nodes.
Per-node overrides: set Node.Timeout > 0 on individual nodes; that takes priority over this DAG-wide default.
func WithErrorPolicy ¶
func WithErrorPolicy(p ErrorPolicy) DagOption
WithErrorPolicy sets the error propagation policy for this DAG. ErrorPolicyFailFast (default) skips downstream nodes when a parent fails. ErrorPolicyContinueOnError allows all nodes to run regardless of parent failures; errors are still reported via the Errors channel.
func WithTimeout ¶
WithTimeout sets the DAG-level execution deadline used by Wait. This is distinct from WithDefaultTimeout, which bounds individual node execution (inFlight). Pass 0 or omit to rely on the caller context only.
func WithWorkerPool ¶
WithWorkerPool sets the maximum number of concurrent node-execution goroutines. If the DAG has fewer nodes than size, the pool is sized to the node count instead.
type DagWorkerPool ¶
type DagWorkerPool struct {
// contains filtered or unexported fields
}
DagWorkerPool manages a bounded pool of goroutines for concurrent node execution. Tasks are submitted via Submit; Close drains the queue and waits for all workers.
func NewDagWorkerPool ¶
func NewDagWorkerPool(limit int) *DagWorkerPool
NewDagWorkerPool creates a new worker pool with the given number of goroutines. The internal task queue is buffered to twice the worker count so that callers are not serialised behind goroutine startup latency.
func (*DagWorkerPool) Close ¶
func (p *DagWorkerPool) Close()
Close 워커 풀을 종료. sync.Once 를 통해 taskQueue 를 한 번만 닫으므로 이중 호출 시 패닉이 발생하지 않는다.
func (*DagWorkerPool) Submit ¶
func (p *DagWorkerPool) Submit(task nodeTask)
Submit enqueues a nodeTask for execution by the worker pool.
type Edge ¶
type Edge struct {
// contains filtered or unexported fields
}
Edge represents a directed connection between two nodes. The embedded safeVertex channel carries runningStatus signals from the parent node to its child during execution. Edges are created by AddEdge and reset by Reset; callers should not manipulate the fields directly.
type ErrorPolicy ¶
type ErrorPolicy int
ErrorPolicy controls how downstream nodes react to upstream failures.
const ( // ErrorPolicyFailFast causes downstream nodes to be skipped when any parent // node fails. This is the default and preserves the invariant that no node // runs after a dependency failure. ErrorPolicyFailFast ErrorPolicy = iota // ErrorPolicyContinueOnError allows downstream nodes to execute even when a // parent has failed. Nodes proceed through all three flight phases regardless // of parent outcome; per-node errors are still collected via Errors channel. ErrorPolicyContinueOnError )
type ErrorType ¶
type ErrorType int
ErrorType identifies the DAG operation that produced a systemError.
type Node ¶
type Node struct {
ID string
ImageName string
Commands string
// Timeout is the per-node inFlight execution budget.
// When Timeout > 0, it overrides Dag.Config.DefaultTimeout for this node.
// Zero means "use DefaultTimeout or, if that is also zero, the caller context".
// Set this field directly before ConnectRunner is called.
Timeout time.Duration
// contains filtered or unexported fields
}
Node is the fundamental building block of a DAG. A Node must always be handled as a pointer; copying a Node is forbidden because atomic.Value must not be copied after first use.
func (*Node) CheckParentsStatus ¶
CheckParentsStatus returns false (and transitions this node to Skipped) if any parent has already failed. The Pending→Skipped transition is guarded by TransitionStatus so that a node in an unexpected state is never silently overwritten.
func (*Node) GetStatus ¶
func (n *Node) GetStatus() NodeStatus
GetStatus returns the node's current status under the read lock.
func (*Node) MarkCompleted ¶
func (n *Node) MarkCompleted()
MarkCompleted increments the parent DAG's completed-node counter.
func (*Node) SetRunner ¶
SetRunner atomically sets the runner for a Pending node. Returns false if the node is not in Pending status.
func (*Node) SetStatus ¶
func (n *Node) SetStatus(status NodeStatus)
SetStatus sets the node's status under the write lock. Prefer TransitionStatus when a pre-condition on the current status is required.
func (*Node) SetSucceed ¶
SetSucceed sets the succeed flag under the write lock.
func (*Node) TransitionStatus ¶
func (n *Node) TransitionStatus(from, to NodeStatus) bool
TransitionStatus atomically advances n's status from `from` to `to`. It returns true only when both conditions hold:
- n.status == from at the moment the lock is acquired, AND
- the from→to transition is permitted by the state machine.
This prevents illegal backwards moves such as Failed→Succeeded. Use SetStatus only when an unconditional override is explicitly required.
type NodeStatus ¶
type NodeStatus int
NodeStatus represents the lifecycle state of a Node.
const ( NodeStatusPending NodeStatus = iota NodeStatusRunning NodeStatusSucceeded NodeStatusFailed NodeStatusSkipped // set when a parent has failed )
NodeStatusPending through NodeStatusSkipped represent the lifecycle states of a Node.
type Runnable ¶
Runnable defines the interface for executable units attached to a DAG node. RunE is invoked during the inFlight phase. The supplied ctx carries the cancellation / timeout signal from the enclosing DAG execution; implementations should honour ctx.Done() so that long-running work can be interrupted cleanly. The second parameter is the *Node that is currently executing, passed as an opaque interface{} to avoid an import cycle for callers outside this package.
type RunnerResolver ¶
RunnerResolver is an optional hook that picks a Runnable dynamically based on node metadata (e.g. image name, labels). It is consulted after the per-node atomic runner but before Dag.ContainerCmd.
type SafeChannel ¶
type SafeChannel[T any] struct { // contains filtered or unexported fields }
SafeChannel is a generic, concurrency-safe channel wrapper that prevents double-close panics and provides non-blocking send semantics.
func NewSafeChannelGen ¶
func NewSafeChannelGen[T any](buffer int) *SafeChannel[T]
NewSafeChannelGen creates a new SafeChannel with the given buffer size.
func (*SafeChannel[T]) Close ¶
func (sc *SafeChannel[T]) Close() (err error)
Close closes the underlying channel exactly once. Returns an error if the channel is already closed or a panic occurs during close.
Close first signals sc.done (via doneOnce) to unblock any SendBlocking callers that are blocked waiting for buffer space, then acquires the write lock to set sc.closed and close the underlying channel. This ordering prevents a deadlock where Close() cannot acquire the write lock because SendBlocking holds the read lock indefinitely on a full channel.
func (*SafeChannel[T]) GetChannel ¶
func (sc *SafeChannel[T]) GetChannel() chan T
GetChannel returns the underlying channel for range/select operations.
func (*SafeChannel[T]) Send ¶
func (sc *SafeChannel[T]) Send(value T) bool
Send attempts to deliver value to the channel. Returns false if the channel is already closed, if Close() has started (sc.done is closed), or if the buffer is full.
The sc.done check closes the narrow race window between Close() signalling done (before acquiring the write lock) and setting sc.closed = true.
func (*SafeChannel[T]) SendBlocking ¶
func (sc *SafeChannel[T]) SendBlocking(ctx context.Context, value T) bool
SendBlocking blocks until value is delivered to the channel, the context is cancelled, or the channel is closed. Returns true when the value was sent; false when the channel was already closed, ctx.Done fired, or a concurrent Close() began.
Unlike Send, SendBlocking never silently drops a value when the buffer is full — it waits for a consumer to make room. Use this for signals where loss would leave a downstream goroutine waiting forever (e.g. edge vertex channels between nodes).
The read lock is held for the duration of the blocking select so that a concurrent Close cannot race with the send. The sc.done case unblocks the select when Close() is called concurrently and the buffer is full — this prevents a deadlock where Close() cannot acquire the write lock because SendBlocking holds the read lock indefinitely.
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
examples
|
|
|
build_system
command
Package main demonstrates a multi-stage software build pipeline using dag-go.
|
Package main demonstrates a multi-stage software build pipeline using dag-go. |
|
etl_pipeline
command
Package main demonstrates a realistic ETL (Extract-Transform-Load) pipeline built with dag-go.
|
Package main demonstrates a realistic ETL (Extract-Transform-Load) pipeline built with dag-go. |
|
reset_retry
command
Package main demonstrates dag-go's Reset() method for implementing a retry strategy without rebuilding the graph.
|
Package main demonstrates dag-go's Reset() method for implementing a retry strategy without rebuilding the graph. |