Documentation
¶
Overview ¶
Package packtrail is the public, embeddable entry point to the packtrail durable workflow engine. packtrail orchestrates declarative YAML flow graphs — task, fanout, fanin, choice and signal nodes — with crash-durable state backed only by NATS (Core + JetStream + KV + Message Scheduler).
packtrail is ecosystem-agnostic: nodes are executed through a pluggable Invoker, so any project can drive its own services (an agent caller, an HTTP client, a NATS request/reply worker) while inheriting durability, retries, fan-in policies, conditional routing, signals and timers. A built-in "nats-task" Invoker (pkg/protocol request/reply) is always registered.
nc, _ := nats.Connect(nats.DefaultURL)
srv, _ := packtrail.New(nc,
packtrail.WithFlowsDir("flows"),
packtrail.WithInvoker("agent", myInvoker), // your ecosystem's transport
packtrail.WithResultCache(), // idempotent retries
)
id, _ := srv.Start(ctx, "research-pipeline", nil)
srv.Run(ctx) // blocks: engine + indexer
The Server does not own the *nats.Conn it is given; the caller connects and closes it.
Index ¶
- Constants
- Variables
- type Branch
- type EdgeDef
- type Event
- type Execution
- type FlowDef
- type FlowGraph
- type GraphEdge
- type GraphNode
- type GraphRule
- type Handler
- type Invoker
- type InvokerFunc
- type NodeDef
- type Option
- func WithDefaultTimeout(d time.Duration) Option
- func WithFlow(yamlDoc []byte) Option
- func WithFlowDef(f FlowDef) Option
- func WithFlowsDir(dir string) Option
- func WithInvoker(kind string, inv invoker.Invoker) Option
- func WithLeaseTTL(d time.Duration) Option
- func WithMaxConcurrency(n int) Option
- func WithNamespace(prefix string) Option
- func WithOwnerID(id string) Option
- func WithReconcile(cronExpr string) Option
- func WithResultCache() Option
- type Request
- type Result
- type RetryPolicy
- type RuleDef
- type Server
- func (s *Server) ByFlow(ctx context.Context, flow string) ([]string, error)
- func (s *Server) ByFlowEvents(ctx context.Context, flow string) ([]Event, error)
- func (s *Server) ByStatus(ctx context.Context, status string) ([]string, error)
- func (s *Server) ByStatusEvents(ctx context.Context, status string) ([]Event, error)
- func (s *Server) Close()
- func (s *Server) CompleteActivity(ctx context.Context, execID, node string, attempt int, res Result) error
- func (s *Server) FlowGraph(ctx context.Context, name string) (*FlowGraph, error)
- func (s *Server) Flows() []string
- func (s *Server) Get(ctx context.Context, execID string) (*Execution, error)
- func (s *Server) Handle(subject string, h Handler) error
- func (s *Server) List(ctx context.Context) ([]string, error)
- func (s *Server) ListFlows(ctx context.Context) ([]string, error)
- func (s *Server) Reconcile(ctx context.Context) error
- func (s *Server) Resume(ctx context.Context, execID string) error
- func (s *Server) Run(ctx context.Context) error
- func (s *Server) ScheduleFlow(ctx context.Context, name, flow, cronExpr string, payload json.RawMessage) error
- func (s *Server) Signal(ctx context.Context, execID, name string, payload json.RawMessage) error
- func (s *Server) Start(ctx context.Context, flow string, payload json.RawMessage) (string, error)
- func (s *Server) WatchEvents(ctx context.Context) (<-chan Event, error)
- type Status
- type TaskRequest
- type TaskResponse
Constants ¶
const ( StatusOK = invoker.StatusOK StatusError = invoker.StatusError StatusRetry = invoker.StatusRetry StatusPending = invoker.StatusPending )
Invocation outcome statuses.
const ( TaskOK = protocol.StatusOK TaskError = protocol.StatusError TaskRetry = protocol.StatusRetry )
Built-in nats-task worker response statuses (the string values a Handler sets on a TaskResponse). For Invoker implementations, use the Status* constants.
const ( ExecRunning = store.StatusRunning ExecWaiting = store.StatusWaiting ExecCompleted = store.StatusCompleted ExecFailed = store.StatusFailed )
Execution statuses.
const NATSTaskKind = natstask.Kind
NATSTaskKind is the invoker kind of the always-registered built-in transport.
Variables ¶
var ErrNotFound = store.ErrNotFound
ErrNotFound is returned by Get when an execution does not exist.
Functions ¶
This section is empty.
Types ¶
type Branch ¶
type Branch struct {
Node string `json:"node"`
Status string `json:"status"`
Result json.RawMessage `json:"result,omitempty"`
Error string `json:"error,omitempty"`
}
Branch is the state of a single fan-out branch.
type Event ¶
type Event struct {
ExecID string `json:"exec_id"`
Flow string `json:"flow"`
Status string `json:"status"`
Node string `json:"node"`
Error string `json:"error,omitempty"`
Revision uint64 `json:"revision"`
Time time.Time `json:"time"`
}
Event is a flow execution transition, suitable for a live activity feed.
type Execution ¶
type Execution struct {
ID string `json:"id"`
Flow string `json:"flow"`
Status string `json:"status"`
CurrentNode string `json:"current_node"`
Payload json.RawMessage `json:"payload"`
Attempt int `json:"attempt"`
Branches map[string]Branch `json:"branches,omitempty"`
Signals map[string]json.RawMessage `json:"signals,omitempty"`
WaitSignal string `json:"wait_signal,omitempty"`
Error string `json:"error,omitempty"`
UpdatedAt time.Time `json:"updated_at"`
}
Execution is a read-only snapshot of a running flow instance.
type FlowDef ¶ added in v0.0.2
FlowDef is a programmatic flow definition. It mirrors the YAML schema and can be passed to WithFlowDef instead of writing YAML.
type FlowGraph ¶
type FlowGraph struct {
Name string `json:"name"`
Nodes []GraphNode `json:"nodes"`
Edges []GraphEdge `json:"edges"`
}
FlowGraph is the static structure of a flow, for visualisation. It is published to a KV registry at startup so observability tools can render a flow without its source YAML.
type GraphNode ¶
type GraphNode struct {
ID string `json:"id"`
Type string `json:"type"` // task | fanout | fanin | choice | signal
Invoker string `json:"invoker,omitempty"`
Target string `json:"target,omitempty"`
Branches []string `json:"branches,omitempty"`
WaitFor []string `json:"wait_for,omitempty"`
JoinPolicy string `json:"join_policy,omitempty"`
Rules []GraphRule `json:"rules,omitempty"`
SignalName string `json:"signal_name,omitempty"`
OnTimeout string `json:"on_timeout,omitempty"`
}
GraphNode is one node of a FlowGraph. Fields are type-specific; empty ones are omitted.
type GraphRule ¶
type GraphRule struct {
When string `json:"when,omitempty"`
Default bool `json:"default,omitempty"`
To string `json:"to"`
}
GraphRule is one routing rule of a choice node.
type Invoker ¶
Invoker executes a single node invocation. Implement it to plug in a transport for your ecosystem.
type NodeDef ¶ added in v0.0.2
type NodeDef struct {
ID string
Type string // "task" | "fanout" | "fanin" | "choice" | "signal"
// task
Invoker string
Target string
Subject string
Timeout time.Duration
Retry *RetryPolicy
// fanout
Branches []string
// fanin
WaitFor []string
JoinPolicy string // "all" | "any" | "quorum:N"
// choice
Rules []RuleDef
// signal
SignalName string
OnTimeout string
}
NodeDef is a single node in a FlowDef.
type Option ¶
type Option func(*config)
Option configures a Server. Pass options to New.
func WithDefaultTimeout ¶
WithDefaultTimeout sets the invocation timeout used when a node omits one (default 30s).
func WithFlow ¶
WithFlow registers a single flow from its YAML document. It may be passed multiple times.
func WithFlowDef ¶ added in v0.0.2
WithFlowDef registers a single flow from a Go struct. It may be passed multiple times and combined with WithFlow / WithFlowsDir.
func WithFlowsDir ¶
WithFlowsDir loads every *.yaml / *.yml flow definition in dir.
func WithInvoker ¶
WithInvoker registers an Invoker under kind, the value a flow node selects via its `invoker:` field. The built-in "nats-task" kind is always registered and may be overridden by passing WithInvoker("nats-task", ...).
func WithLeaseTTL ¶
WithLeaseTTL sets the per-execution ownership lease TTL (default 30s). A crashed instance's executions become available to others after roughly this.
func WithMaxConcurrency ¶
WithMaxConcurrency caps how many work items this instance processes at once (default 64).
func WithNamespace ¶
WithNamespace sets the resource prefix for every NATS bucket, stream, subject and durable consumer (default "packtrail"). Give each independent deployment a distinct namespace to let them share a NATS cluster without colliding.
func WithOwnerID ¶
WithOwnerID sets this instance's ownership-lease owner id. Defaults to a random id; only set it if you need a stable, distinct id per instance.
func WithReconcile ¶
WithReconcile installs a recurring visibility reconciliation on the given 6-field cron expression ("sec min hour dom mon dow"), e.g. "0 */5 * * * *". Without it, the indexer still runs but no periodic reconcile is scheduled.
func WithResultCache ¶
func WithResultCache() Option
WithResultCache enables idempotent invocation: every node result is cached by (execution, node, attempt) in a KV bucket, so a work item redelivered after a crash returns the cached result instead of re-invoking the node. Enable it whenever invocations have side effects that must not run twice.
type RetryPolicy ¶ added in v0.0.2
RetryPolicy controls task retries for a NodeDef.
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
Server is an embeddable packtrail engine instance: it runs the work consumer, visibility indexer and (optionally) reconciliation, and can host built-in nats-task workers in the same process.
func New ¶
New builds a Server against an existing NATS connection. It bootstraps all buckets and streams under the configured namespace, registers the built-in nats-task invoker plus any supplied via WithInvoker, and loads the configured flows. It does not start processing until Run is called.
func (*Server) ByFlowEvents ¶ added in v0.0.2
ByFlowEvents returns a summary event for every execution belonging to flow, read directly from the visibility index without a per-execution round-trip.
func (*Server) ByStatus ¶
ByStatus returns the ids of executions currently indexed under status. The index is eventually consistent (best-effort visibility).
func (*Server) ByStatusEvents ¶ added in v0.0.2
ByStatusEvents returns a summary event for every execution currently indexed under status, read directly from the visibility index without a per-execution round-trip. The index is eventually consistent; use Get for authoritative state.
func (*Server) Close ¶
func (s *Server) Close()
Close drains any registered task workers. It does not close the NATS connection, which the caller owns.
func (*Server) CompleteActivity ¶
func (s *Server) CompleteActivity(ctx context.Context, execID, node string, attempt int, res Result) error
CompleteActivity settles an asynchronous activity a node's Invoker previously reported as StatusPending. node and attempt identify the dispatched work (from Request.NodeID / Request.Attempt); res is its outcome. It is idempotent and stale-safe — a duplicate or out-of-date completion is a no-op — so an at-least-once worker can call it freely.
func (*Server) Get ¶
Get returns a snapshot of an execution, or ErrNotFound. The execution KV is the source of truth; read it (not the indexes) for correctness decisions.
func (*Server) Handle ¶
Handle registers a built-in nats-task worker for subject (NATS wildcards allowed, e.g. "tasks.triage.*") in this process. The namespace prefix is prepended automatically, so the worker subscribes to "<namespace>.tasks.triage.*". Workers are drained when Run returns or Close is called.
func (*Server) ListFlows ¶
ListFlows returns the names of every flow in the registry. Unlike Flows() (the flows this Server instance loaded), this reads the shared KV registry, so an observer process that loaded no flows still sees them.
func (*Server) Resume ¶
Resume revives a failed execution, re-running the node it failed on with a fresh retry budget (the durable payload is preserved). Only failed executions can be resumed. It is durable: any running engine for the namespace picks up the resumed work.
func (*Server) Run ¶
Run starts the engine, the visibility indexer and (if configured) the reconciliation schedule, and blocks until ctx is cancelled. Registered task workers are drained on return.
func (*Server) ScheduleFlow ¶
func (s *Server) ScheduleFlow(ctx context.Context, name, flow, cronExpr string, payload json.RawMessage) error
ScheduleFlow installs a recurring schedule named name that starts flow on the given 6-field cron expression. Reusing name replaces the schedule.
func (*Server) Start ¶
Start creates a new execution of flow with the given initial payload and returns its id.
func (*Server) WatchEvents ¶
WatchEvents streams execution transitions as they happen. It delivers events published after the call (an ephemeral consumer with DeliverNew); load current state via Get/ByStatus first, then apply events live. The channel is closed when ctx is cancelled.
type TaskRequest ¶
type TaskRequest = protocol.TaskRequest
TaskRequest is the envelope delivered to a nats-task handler.
type TaskResponse ¶
type TaskResponse = protocol.TaskResponse
TaskResponse is the envelope a nats-task handler returns.
Directories
¶
| Path | Synopsis |
|---|---|
|
cmd
|
|
|
packtrail-ui
command
Command packtrail-ui is a read-only observability dashboard for a packtrail deployment.
|
Command packtrail-ui is a read-only observability dashboard for a packtrail deployment. |
|
examples
|
|
|
embedded
command
Command embedded shows packtrail running as a single microservice: the engine and the task workers live in one process, importing only the public github.com/henomis/packtrail package.
|
Command embedded shows packtrail running as a single microservice: the engine and the task workers live in one process, importing only the public github.com/henomis/packtrail package. |
|
worker
command
Command worker is a tiny example task service for the research-pipeline flow.
|
Command worker is a tiny example task service for the research-pipeline flow. |
|
internal
|
|
|
dsl
Package dsl parses and validates Packtrail Flow Definitions (YAML) and exposes graph-walk helpers used by the runtime.
|
Package dsl parses and validates Packtrail Flow Definitions (YAML) and exposes graph-walk helpers used by the runtime. |
|
names
Package names centralises every NATS resource name Packtrail uses — KV buckets, streams, subject prefixes and durable consumer names — derived from a single namespace prefix.
|
Package names centralises every NATS resource name Packtrail uses — KV buckets, streams, subject prefixes and durable consumer names — derived from a single namespace prefix. |
|
natstest
Package natstest starts an embedded, in-process nats-server with JetStream enabled for use in tests.
|
Package natstest starts an embedded, in-process nats-server with JetStream enabled for use in tests. |
|
rules
Package rules compiles and evaluates the boolean expressions used by choice nodes.
|
Package rules compiles and evaluates the boolean expressions used by choice nodes. |
|
runtime
Package runtime is the packtrail execution engine: it walks flow graphs, invokes nodes through a pluggable Invoker, and drives fanout/fanin, choice and signal nodes.
|
Package runtime is the packtrail execution engine: it walks flow graphs, invokes nodes through a pluggable Invoker, and drives fanout/fanin, choice and signal nodes. |
|
scheduler
Package scheduler wraps the NATS JetStream Message Scheduler.
|
Package scheduler wraps the NATS JetStream Message Scheduler. |
|
signal
Package signal carries external signals to waiting executions.
|
Package signal carries external signals to waiting executions. |
|
store
Package store wraps the NATS JetStream KV buckets and streams that hold all Packtrail state: executions, ownership leases, visibility indexes and the domain event stream.
|
Package store wraps the NATS JetStream KV buckets and streams that hold all Packtrail state: executions, ownership leases, visibility indexes and the domain event stream. |
|
visibility
Package visibility maintains the eventually-consistent status/flow indexes (spec §9).
|
Package visibility maintains the eventually-consistent status/flow indexes (spec §9). |
|
Package invoker defines packtrail's agnostic node-invocation contract.
|
Package invoker defines packtrail's agnostic node-invocation contract. |
|
natstask
Package natstask is packtrail's built-in Invoker: a NATS request/reply caller that speaks the pkg/protocol envelope.
|
Package natstask is packtrail's built-in Invoker: a NATS request/reply caller that speaks the pkg/protocol envelope. |
|
pkg
|
|
|
protocol
Package protocol defines the public NATS request/reply contract between the Packtrail engine and the remote services that implement workflow tasks.
|
Package protocol defines the public NATS request/reply contract between the Packtrail engine and the remote services that implement workflow tasks. |