packtrail

package module
v0.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 23, 2026 License: Apache-2.0 Imports: 19 Imported by: 0

README

Packtrail

Build Status GoDoc Go Report Card GitHub release

A durable, ecosystem-agnostic workflow engine in Go, backed only by NATS (Core + JetStream + KV + Message Scheduler). Packtrail orchestrates declarative flow graphs — task, fanout, fanin, choice and signal nodes — defined either in YAML or directly as Go structs, with crash-durable state, retries, conditional routing, external signals and timers/cron.

Packtrail's defining feature is that node execution is pluggable. The engine never speaks a wire protocol directly: every task/branch node runs through an Invoker. A project plugs in its own transport — an agent caller, an HTTP client, a NATS request/reply worker — and inherits all of packtrail's durability machinery for free.

flowchart LR
    FLOWS["YAML flows\nor Go structs"] --> Engine["Engine\n(runtime)"]
    Engine <-->|"invoker.Invoke\n(pluggable seam)"| Invokers["Invoker(s)\nagent / http\nnats-task"]
    Invokers --> Services["your services\n(agents, APIs…)"]
    Engine -->|"CAS state / work / timers"| NATS["NATS JetStream + KV\n(the only backend)"]

Installation

go get github.com/henomis/packtrail

Requires Go 1.26+ and a running NATS server with JetStream enabled (nats-server -js). Tests embed a real NATS server — no external server needed to run them.

Quick start

nc, _ := nats.Connect(nats.DefaultURL)

srv, _ := packtrail.New(nc,
    packtrail.WithFlowsDir("flows"),           // directory of *.yaml flow files
    packtrail.WithNamespace("acme"),           // isolate from other deployments
    packtrail.WithInvoker("agent", myInvoker), // your transport
    packtrail.WithResultCache(),               // idempotent retries
)

// Register an in-process nats-task worker (optional)
srv.Handle("tasks.notify.*", notifyHandler)

id, _ := srv.Start(ctx, "agent-pipeline", payload)
srv.Signal(ctx, id, "approval", data)
ex, _ := srv.Get(ctx, id)

srv.Run(ctx) // blocks: engine + indexer + reconcile

Built-in transport

Packtrail ships the built-in nats-task invoker — a pkg/protocol request/reply on tasks.<x>.* — as the default transport. So:

  • Any task worker that serves the protocol (protocol.Serve on tasks.*) works unchanged — just use the default subject: on a node.
  • New flows can select any registered invoker per node via invoker: + target:.
  • The core has no dependency on any agent framework (enforced by internal/acceptance), so it stays reusable by any project.

Flow definition

Flows can be defined in YAML or as Go structs — both paths run through the same validation and produce identical runtime behaviour.

YAML
version: "1.0"
name: agent-pipeline
nodes:
  - {id: triage, type: task, invoker: agent, target: triage-agent,
     timeout: 2m, retry: {max_attempts: 3, backoff: exponential}}
  - id: route
    type: choice
    rules:
      - {when: 'payload.category == "billing"', to: billing-agent}
      - {default: true, to: general-agent}
  - {id: billing-agent, type: task, invoker: agent, target: billing-agent}
  - {id: general-agent, type: task, invoker: agent, target: general-agent}
  - {id: notify, type: task, subject: "tasks.notify.{execution_id}"}  # built-in nats-task
edges:
  - {from: triage, to: route}
  - {from: billing-agent, to: notify}
  - {from: general-agent, to: notify}
Go structs

The same flow as a FlowDef, useful when flows are constructed programmatically:

packtrail.WithFlowDef(packtrail.FlowDef{
    Name: "agent-pipeline",
    Nodes: []packtrail.NodeDef{
        {ID: "triage", Type: "task", Invoker: "agent", Target: "triage-agent",
         Timeout: 2 * time.Minute, Retry: &packtrail.RetryPolicy{MaxAttempts: 3, Backoff: "exponential"}},
        {ID: "route", Type: "choice", Rules: []packtrail.RuleDef{
            {When: `payload.category == "billing"`, To: "billing-agent"},
            {Default: true, To: "general-agent"},
        }},
        {ID: "billing-agent", Type: "task", Invoker: "agent", Target: "billing-agent"},
        {ID: "general-agent", Type: "task", Invoker: "agent", Target: "general-agent"},
        {ID: "notify", Type: "task", Subject: "tasks.notify.{execution_id}"},
    },
    Edges: []packtrail.EdgeDef{
        {From: "triage", To: "route"},
        {From: "billing-agent", To: "notify"},
        {From: "general-agent", To: "notify"},
    },
})

WithFlowDef may be combined freely with WithFlow and WithFlowsDir; duplicate flow names across any source are rejected at startup.

  • invoker: / Invoker selects a registered Invoker kind (default nats-task).
  • target: / Target is interpreted by that Invoker (an agent name, a URL, …); subject: / Subject is the nats-task alias. {execution_id} is substituted at dispatch.
  • retry.backoff / Retry.Backoff accepts exponential, linear, or fixed (default).

Node types

task

Invokes an Invoker with the current payload. The most common node type.

- id: step
  type: task
  invoker: agent          # registered invoker kind (default: nats-task)
  target: my-agent        # interpreted by the invoker
  timeout: 2m
  retry:
    max_attempts: 3
    backoff: exponential
choice

Routes the execution to one of several branches based on boolean expressions evaluated against the shared payload:

- id: route
  type: choice
  rules:
    - {when: 'payload.risk_score > 80', to: manual-review}
    - {when: 'payload.category == "billing" && payload.amount > 1000', to: billing-agent}
    - {default: true, to: general-agent}
  • Expression language. when uses expr-lang: comparisons (==, !=, <, >), boolean logic (&&, ||, !), membership (in), string and arithmetic operators. Compiled once on load — a syntax error is a validation error, not a runtime surprise.
  • payload variable. The only variable in scope is payload (the shared execution payload). Reach into it with dotted paths: payload.user.tier, payload.items[0].sku.
  • First match wins. Rules are evaluated top to bottom. Order from most to least specific.
  • default is required. Validation rejects a choice node without a {default: true, to: …} branch, so a choice can never dead-end.
  • Missing fields fall through. If a when expression errors (e.g. missing field), that rule counts as no match and evaluation continues to the next rule.
fanout / fanin

Dispatch multiple branches in parallel and join them back:

- id: fan
  type: fanout
  branches: [worker-a, worker-b, worker-c]

- id: join
  type: fanin
  wait_for: [worker-a, worker-b, worker-c]
  join_policy: all          # all | any | quorum:N
  • fanout launches every branch listed in branches as a parallel sub-execution.
  • fanin waits for the branches listed in wait_for according to join_policy:
    • all (default) — advance when every branch completes.
    • any — advance when the first branch completes.
    • quorum:N — advance when at least N branches complete.
signal

Parks the execution until an external signal arrives (or the timeout fires):

- id: wait-approval
  type: signal
  signal_name: approval
  timeout: 24h
  on_timeout: escalation    # node to jump to on timeout

Send the signal from your application:

srv.Signal(ctx, execID, "approval", json.RawMessage(`{"approved": true}`))

The signal payload is merged into the execution payload and execution resumes at the next node. If timeout elapses first, the execution advances to on_timeout instead.

Async activities (long-running work)

An Invoker normally returns a terminal status (StatusOK/Error/Retry) and the engine settles the node synchronously. For long-running work (an agent call, a remote job) an Invoker can instead return StatusPending: the engine parks the execution as waiting and frees its work slot immediately, without blocking.

The worker that eventually finishes the activity calls Server.CompleteActivity(ctx, execID, node, attempt, result) to settle it — OK to advance, Error to fail, Retry to re-dispatch per the node policy. It is idempotent and stale-safe (keyed by node + attempt, and robust to a completion that arrives before the task has finished parking), so an at-least-once worker can call it freely. This works for plain task nodes and fan-out branches alike.

// dispatch (non-blocking): enqueue a durable job, return pending
func (d *dispatcher) Invoke(ctx context.Context, req packtrail.Request) (packtrail.Result, error) {
    enqueueJob(req.ExecutionID, req.NodeID, req.Attempt, req.Payload) // your durable queue
    return packtrail.Result{Status: packtrail.StatusPending}, nil
}

// later, from the worker that ran the job:
srv.CompleteActivity(ctx, execID, node, attempt,
    packtrail.Result{Status: packtrail.StatusOK, Payload: out})

Resuming failed executions

A failed execution can be revived with Resume. It re-runs the node it failed on with a fresh retry budget, preserving the durable payload. Any running engine for the namespace picks up the resumed work.

err := srv.Resume(ctx, execID)

Cron scheduling

Start a flow on a recurring schedule with ScheduleFlow. The cron expression is 6-field (sec min hour dom mon dow):

// trigger "daily-report" at 08:00 every day
srv.ScheduleFlow(ctx, "daily-report-schedule", "daily-report", "0 0 8 * * *", nil)

Calling ScheduleFlow again with the same name replaces the existing schedule.

To also run a periodic visibility reconciliation, configure it at startup:

packtrail.WithReconcile("0 */5 * * * *") // reconcile every 5 minutes

Writing an Invoker

An Invoker is the bridge between packtrail and your ecosystem:

type Invoker interface {
    Invoke(ctx context.Context, req Request) (Result, error)
}

Request carries the resolved Target, the shared Payload (opaque JSON), the Attempt number and a Deadline. Return Result{Status: StatusOK, Payload: out} to advance with a new shared payload, StatusError to fail the node, or StatusRetry (or a non-nil error) to retry per the node's policy.

Idempotency under at-least-once delivery

Packtrail is durable because it may redeliver: if an engine crashes after invoking a node but before persisting the advance, the work item is redelivered. Wrap invocations in the result cache (WithResultCache()) so a redelivery of the same (execution, node, attempt) returns the stored result instead of re-running the side effect, while a genuine retry (a new attempt) still re-invokes. Enable it whenever invocations have side effects that must not run twice (LLM calls, writes, e-mails). See invoker/cache.go.

Server options

Option Default Description
WithNamespace(prefix) "packtrail" Prefix for every NATS resource; isolates deployments on a shared cluster
WithFlowsDir(dir) Load all *.yaml/*.yml files in dir
WithFlow(yamlDoc) Register a single flow from an inline YAML document; may be called multiple times
WithFlowDef(f) Register a single flow from a FlowDef Go struct; may be combined with WithFlow/WithFlowsDir
WithInvoker(kind, inv) Register an Invoker under kind; overrides the built-in "nats-task" if reused
WithResultCache() disabled Cache invocation results by (execution, node, attempt) for idempotent retries
WithReconcile(cronExpr) Schedule periodic visibility reconciliation (6-field cron)
WithOwnerID(id) random Stable per-instance lease owner id
WithLeaseTTL(d) 30s Ownership lease TTL; a crashed instance's work becomes available after this
WithMaxConcurrency(n) 64 Max work items processed concurrently per instance
WithDefaultTimeout(d) 30s Invocation timeout for nodes that omit one

Observability (packtrail-ui)

cmd/packtrail-ui is a read-only web dashboard for any packtrail deployment. It connects to the same NATS cluster, reads execution state and the flow registry (every flow's graph is published to a KV bucket at startup), and tails the live event stream — so it needs no access to your flow source or engine process.

go run ./cmd/packtrail-ui --namespace packtrail --addr :8088   # NATS_URL honoured

It serves an embedded (no-npm) dashboard: a filterable execution list, a detail view (status, current node, payload, branches, signals, error), and an SVG flow graph with the live execution overlaid, updated in real time over SSE. The backing API is also usable directly:

endpoint returns
GET /api/flows flow names
GET /api/flows/{name} flow graph (FlowGraph)
GET /api/executions[?status=&flow=] execution summaries
GET /api/executions/{id} full execution snapshot
GET /api/events live transitions (Server-Sent Events)

The same data is available programmatically via Server:

// flows
names, _ := srv.ListFlows(ctx)
graph, _ := srv.FlowGraph(ctx, "agent-pipeline")

// executions
ids, _ := srv.ByStatus(ctx, packtrail.ExecRunning)
ids, _ := srv.ByFlow(ctx, "agent-pipeline")
ex, _ := srv.Get(ctx, execID)

// live event stream
events, _ := srv.WatchEvents(ctx)
for ev := range events {
    fmt.Println(ev.ExecID, ev.Status, ev.Node)
}

WatchEvents delivers events published after the call. Load current state via Get/ByStatus first, then apply events live to avoid races.

Development

go build ./...
go test -race ./...   # all packages run against a real embedded nats-server
go vet ./...
gofmt -l .

License

Apache 2.0 — see LICENSE.

Documentation

Overview

Package packtrail is the public, embeddable entry point to the packtrail durable workflow engine. packtrail orchestrates declarative YAML flow graphs — task, fanout, fanin, choice and signal nodes — with crash-durable state backed only by NATS (Core + JetStream + KV + Message Scheduler).

packtrail is ecosystem-agnostic: nodes are executed through a pluggable Invoker, so any project can drive its own services (an agent caller, an HTTP client, a NATS request/reply worker) while inheriting durability, retries, fan-in policies, conditional routing, signals and timers. A built-in "nats-task" Invoker (pkg/protocol request/reply) is always registered.

nc, _ := nats.Connect(nats.DefaultURL)
srv, _ := packtrail.New(nc,
    packtrail.WithFlowsDir("flows"),
    packtrail.WithInvoker("agent", myInvoker),  // your ecosystem's transport
    packtrail.WithResultCache(),                // idempotent retries
)
id, _ := srv.Start(ctx, "research-pipeline", nil)
srv.Run(ctx) // blocks: engine + indexer

The Server does not own the *nats.Conn it is given; the caller connects and closes it.

Index

Constants

View Source
const (
	StatusOK      = invoker.StatusOK
	StatusError   = invoker.StatusError
	StatusRetry   = invoker.StatusRetry
	StatusPending = invoker.StatusPending
)

Invocation outcome statuses.

View Source
const (
	TaskOK    = protocol.StatusOK
	TaskError = protocol.StatusError
	TaskRetry = protocol.StatusRetry
)

Built-in nats-task worker response statuses (the string values a Handler sets on a TaskResponse). For Invoker implementations, use the Status* constants.

View Source
const (
	ExecRunning   = store.StatusRunning
	ExecWaiting   = store.StatusWaiting
	ExecCompleted = store.StatusCompleted
	ExecFailed    = store.StatusFailed
)

Execution statuses.

View Source
const NATSTaskKind = natstask.Kind

NATSTaskKind is the invoker kind of the always-registered built-in transport.

Variables

View Source
var ErrNotFound = store.ErrNotFound

ErrNotFound is returned by Get when an execution does not exist.

Functions

This section is empty.

Types

type Branch

type Branch struct {
	Node   string          `json:"node"`
	Status string          `json:"status"`
	Result json.RawMessage `json:"result,omitempty"`
	Error  string          `json:"error,omitempty"`
}

Branch is the state of a single fan-out branch.

type EdgeDef added in v0.0.2

type EdgeDef struct {
	From string
	To   string
}

EdgeDef connects two nodes in a FlowDef.

type Event

type Event struct {
	ExecID   string    `json:"exec_id"`
	Flow     string    `json:"flow"`
	Status   string    `json:"status"`
	Node     string    `json:"node"`
	Error    string    `json:"error,omitempty"`
	Revision uint64    `json:"revision"`
	Time     time.Time `json:"time"`
}

Event is a flow execution transition, suitable for a live activity feed.

type Execution

type Execution struct {
	ID          string                     `json:"id"`
	Flow        string                     `json:"flow"`
	Status      string                     `json:"status"`
	CurrentNode string                     `json:"current_node"`
	Payload     json.RawMessage            `json:"payload"`
	Attempt     int                        `json:"attempt"`
	Branches    map[string]Branch          `json:"branches,omitempty"`
	Signals     map[string]json.RawMessage `json:"signals,omitempty"`
	WaitSignal  string                     `json:"wait_signal,omitempty"`
	Error       string                     `json:"error,omitempty"`
	UpdatedAt   time.Time                  `json:"updated_at"`
}

Execution is a read-only snapshot of a running flow instance.

type FlowDef added in v0.0.2

type FlowDef struct {
	Name  string
	Nodes []NodeDef
	Edges []EdgeDef
}

FlowDef is a programmatic flow definition. It mirrors the YAML schema and can be passed to WithFlowDef instead of writing YAML.

type FlowGraph

type FlowGraph struct {
	Name  string      `json:"name"`
	Nodes []GraphNode `json:"nodes"`
	Edges []GraphEdge `json:"edges"`
}

FlowGraph is the static structure of a flow, for visualisation. It is published to a KV registry at startup so observability tools can render a flow without its source YAML.

type GraphEdge

type GraphEdge struct {
	From string `json:"from"`
	To   string `json:"to"`
}

GraphEdge is a static edge between two nodes.

type GraphNode

type GraphNode struct {
	ID         string      `json:"id"`
	Type       string      `json:"type"` // task | fanout | fanin | choice | signal
	Invoker    string      `json:"invoker,omitempty"`
	Target     string      `json:"target,omitempty"`
	Branches   []string    `json:"branches,omitempty"`
	WaitFor    []string    `json:"wait_for,omitempty"`
	JoinPolicy string      `json:"join_policy,omitempty"`
	Rules      []GraphRule `json:"rules,omitempty"`
	SignalName string      `json:"signal_name,omitempty"`
	OnTimeout  string      `json:"on_timeout,omitempty"`
}

GraphNode is one node of a FlowGraph. Fields are type-specific; empty ones are omitted.

type GraphRule

type GraphRule struct {
	When    string `json:"when,omitempty"`
	Default bool   `json:"default,omitempty"`
	To      string `json:"to"`
}

GraphRule is one routing rule of a choice node.

type Handler

type Handler = protocol.Handler

Handler implements a nats-task worker's business logic.

type Invoker

type Invoker = invoker.Invoker

Invoker executes a single node invocation. Implement it to plug in a transport for your ecosystem.

type InvokerFunc

type InvokerFunc = invoker.Func

InvokerFunc adapts a plain function to Invoker.

type NodeDef added in v0.0.2

type NodeDef struct {
	ID   string
	Type string // "task" | "fanout" | "fanin" | "choice" | "signal"

	// task
	Invoker string
	Target  string
	Subject string
	Timeout time.Duration
	Retry   *RetryPolicy

	// fanout
	Branches []string

	// fanin
	WaitFor    []string
	JoinPolicy string // "all" | "any" | "quorum:N"

	// choice
	Rules []RuleDef

	// signal
	SignalName string
	OnTimeout  string
}

NodeDef is a single node in a FlowDef.

type Option

type Option func(*config)

Option configures a Server. Pass options to New.

func WithDefaultTimeout

func WithDefaultTimeout(d time.Duration) Option

WithDefaultTimeout sets the invocation timeout used when a node omits one (default 30s).

func WithFlow

func WithFlow(yamlDoc []byte) Option

WithFlow registers a single flow from its YAML document. It may be passed multiple times.

func WithFlowDef added in v0.0.2

func WithFlowDef(f FlowDef) Option

WithFlowDef registers a single flow from a Go struct. It may be passed multiple times and combined with WithFlow / WithFlowsDir.

func WithFlowsDir

func WithFlowsDir(dir string) Option

WithFlowsDir loads every *.yaml / *.yml flow definition in dir.

func WithInvoker

func WithInvoker(kind string, inv invoker.Invoker) Option

WithInvoker registers an Invoker under kind, the value a flow node selects via its `invoker:` field. The built-in "nats-task" kind is always registered and may be overridden by passing WithInvoker("nats-task", ...).

func WithLeaseTTL

func WithLeaseTTL(d time.Duration) Option

WithLeaseTTL sets the per-execution ownership lease TTL (default 30s). A crashed instance's executions become available to others after roughly this.

func WithMaxConcurrency

func WithMaxConcurrency(n int) Option

WithMaxConcurrency caps how many work items this instance processes at once (default 64).

func WithNamespace

func WithNamespace(prefix string) Option

WithNamespace sets the resource prefix for every NATS bucket, stream, subject and durable consumer (default "packtrail"). Give each independent deployment a distinct namespace to let them share a NATS cluster without colliding.

func WithOwnerID

func WithOwnerID(id string) Option

WithOwnerID sets this instance's ownership-lease owner id. Defaults to a random id; only set it if you need a stable, distinct id per instance.

func WithReconcile

func WithReconcile(cronExpr string) Option

WithReconcile installs a recurring visibility reconciliation on the given 6-field cron expression ("sec min hour dom mon dow"), e.g. "0 */5 * * * *". Without it, the indexer still runs but no periodic reconcile is scheduled.

func WithResultCache

func WithResultCache() Option

WithResultCache enables idempotent invocation: every node result is cached by (execution, node, attempt) in a KV bucket, so a work item redelivered after a crash returns the cached result instead of re-invoking the node. Enable it whenever invocations have side effects that must not run twice.

type Request

type Request = invoker.Request

Request is the invocation passed to an Invoker.

type Result

type Result = invoker.Result

Result is what an Invoker returns.

type RetryPolicy added in v0.0.2

type RetryPolicy struct {
	MaxAttempts int
	Backoff     string // "exponential" | "linear" | "fixed"
}

RetryPolicy controls task retries for a NodeDef.

type RuleDef added in v0.0.2

type RuleDef struct {
	When    string
	Default bool
	To      string
}

RuleDef is one branch of a choice node.

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server is an embeddable packtrail engine instance: it runs the work consumer, visibility indexer and (optionally) reconciliation, and can host built-in nats-task workers in the same process.

func New

func New(nc *nats.Conn, opts ...Option) (*Server, error)

New builds a Server against an existing NATS connection. It bootstraps all buckets and streams under the configured namespace, registers the built-in nats-task invoker plus any supplied via WithInvoker, and loads the configured flows. It does not start processing until Run is called.

func (*Server) ByFlow

func (s *Server) ByFlow(ctx context.Context, flow string) ([]string, error)

ByFlow returns the ids of executions belonging to flow.

func (*Server) ByFlowEvents added in v0.0.2

func (s *Server) ByFlowEvents(ctx context.Context, flow string) ([]Event, error)

ByFlowEvents returns a summary event for every execution belonging to flow, read directly from the visibility index without a per-execution round-trip.

func (*Server) ByStatus

func (s *Server) ByStatus(ctx context.Context, status string) ([]string, error)

ByStatus returns the ids of executions currently indexed under status. The index is eventually consistent (best-effort visibility).

func (*Server) ByStatusEvents added in v0.0.2

func (s *Server) ByStatusEvents(ctx context.Context, status string) ([]Event, error)

ByStatusEvents returns a summary event for every execution currently indexed under status, read directly from the visibility index without a per-execution round-trip. The index is eventually consistent; use Get for authoritative state.

func (*Server) Close

func (s *Server) Close()

Close drains any registered task workers. It does not close the NATS connection, which the caller owns.

func (*Server) CompleteActivity

func (s *Server) CompleteActivity(ctx context.Context, execID, node string, attempt int, res Result) error

CompleteActivity settles an asynchronous activity a node's Invoker previously reported as StatusPending. node and attempt identify the dispatched work (from Request.NodeID / Request.Attempt); res is its outcome. It is idempotent and stale-safe — a duplicate or out-of-date completion is a no-op — so an at-least-once worker can call it freely.

func (*Server) FlowGraph

func (s *Server) FlowGraph(ctx context.Context, name string) (*FlowGraph, error)

FlowGraph returns a flow's graph from the registry, or ErrNotFound.

func (*Server) Flows

func (s *Server) Flows() []string

Flows returns the names of the flows this server knows.

func (*Server) Get

func (s *Server) Get(ctx context.Context, execID string) (*Execution, error)

Get returns a snapshot of an execution, or ErrNotFound. The execution KV is the source of truth; read it (not the indexes) for correctness decisions.

func (*Server) Handle

func (s *Server) Handle(subject string, h Handler) error

Handle registers a built-in nats-task worker for subject (NATS wildcards allowed, e.g. "tasks.triage.*") in this process. The namespace prefix is prepended automatically, so the worker subscribes to "<namespace>.tasks.triage.*". Workers are drained when Run returns or Close is called.

func (*Server) List

func (s *Server) List(ctx context.Context) ([]string, error)

List returns every execution id in the store (the authoritative set).

func (*Server) ListFlows

func (s *Server) ListFlows(ctx context.Context) ([]string, error)

ListFlows returns the names of every flow in the registry. Unlike Flows() (the flows this Server instance loaded), this reads the shared KV registry, so an observer process that loaded no flows still sees them.

func (*Server) Reconcile

func (s *Server) Reconcile(ctx context.Context) error

Reconcile rebuilds the visibility indexes from the source of truth.

func (*Server) Resume

func (s *Server) Resume(ctx context.Context, execID string) error

Resume revives a failed execution, re-running the node it failed on with a fresh retry budget (the durable payload is preserved). Only failed executions can be resumed. It is durable: any running engine for the namespace picks up the resumed work.

func (*Server) Run

func (s *Server) Run(ctx context.Context) error

Run starts the engine, the visibility indexer and (if configured) the reconciliation schedule, and blocks until ctx is cancelled. Registered task workers are drained on return.

func (*Server) ScheduleFlow

func (s *Server) ScheduleFlow(ctx context.Context, name, flow, cronExpr string, payload json.RawMessage) error

ScheduleFlow installs a recurring schedule named name that starts flow on the given 6-field cron expression. Reusing name replaces the schedule.

func (*Server) Signal

func (s *Server) Signal(ctx context.Context, execID, name string, payload json.RawMessage) error

Signal sends an external signal to an execution.

func (*Server) Start

func (s *Server) Start(ctx context.Context, flow string, payload json.RawMessage) (string, error)

Start creates a new execution of flow with the given initial payload and returns its id.

func (*Server) WatchEvents

func (s *Server) WatchEvents(ctx context.Context) (<-chan Event, error)

WatchEvents streams execution transitions as they happen. It delivers events published after the call (an ephemeral consumer with DeliverNew); load current state via Get/ByStatus first, then apply events live. The channel is closed when ctx is cancelled.

type Status

type Status = invoker.Status

Status is an invocation outcome.

type TaskRequest

type TaskRequest = protocol.TaskRequest

TaskRequest is the envelope delivered to a nats-task handler.

type TaskResponse

type TaskResponse = protocol.TaskResponse

TaskResponse is the envelope a nats-task handler returns.

Directories

Path Synopsis
cmd
packtrail-ui command
Command packtrail-ui is a read-only observability dashboard for a packtrail deployment.
Command packtrail-ui is a read-only observability dashboard for a packtrail deployment.
examples
embedded command
Command embedded shows packtrail running as a single microservice: the engine and the task workers live in one process, importing only the public github.com/henomis/packtrail package.
Command embedded shows packtrail running as a single microservice: the engine and the task workers live in one process, importing only the public github.com/henomis/packtrail package.
worker command
Command worker is a tiny example task service for the research-pipeline flow.
Command worker is a tiny example task service for the research-pipeline flow.
internal
dsl
Package dsl parses and validates Packtrail Flow Definitions (YAML) and exposes graph-walk helpers used by the runtime.
Package dsl parses and validates Packtrail Flow Definitions (YAML) and exposes graph-walk helpers used by the runtime.
names
Package names centralises every NATS resource name Packtrail uses — KV buckets, streams, subject prefixes and durable consumer names — derived from a single namespace prefix.
Package names centralises every NATS resource name Packtrail uses — KV buckets, streams, subject prefixes and durable consumer names — derived from a single namespace prefix.
natstest
Package natstest starts an embedded, in-process nats-server with JetStream enabled for use in tests.
Package natstest starts an embedded, in-process nats-server with JetStream enabled for use in tests.
rules
Package rules compiles and evaluates the boolean expressions used by choice nodes.
Package rules compiles and evaluates the boolean expressions used by choice nodes.
runtime
Package runtime is the packtrail execution engine: it walks flow graphs, invokes nodes through a pluggable Invoker, and drives fanout/fanin, choice and signal nodes.
Package runtime is the packtrail execution engine: it walks flow graphs, invokes nodes through a pluggable Invoker, and drives fanout/fanin, choice and signal nodes.
scheduler
Package scheduler wraps the NATS JetStream Message Scheduler.
Package scheduler wraps the NATS JetStream Message Scheduler.
signal
Package signal carries external signals to waiting executions.
Package signal carries external signals to waiting executions.
store
Package store wraps the NATS JetStream KV buckets and streams that hold all Packtrail state: executions, ownership leases, visibility indexes and the domain event stream.
Package store wraps the NATS JetStream KV buckets and streams that hold all Packtrail state: executions, ownership leases, visibility indexes and the domain event stream.
visibility
Package visibility maintains the eventually-consistent status/flow indexes (spec §9).
Package visibility maintains the eventually-consistent status/flow indexes (spec §9).
Package invoker defines packtrail's agnostic node-invocation contract.
Package invoker defines packtrail's agnostic node-invocation contract.
natstask
Package natstask is packtrail's built-in Invoker: a NATS request/reply caller that speaks the pkg/protocol envelope.
Package natstask is packtrail's built-in Invoker: a NATS request/reply caller that speaks the pkg/protocol envelope.
pkg
protocol
Package protocol defines the public NATS request/reply contract between the Packtrail engine and the remote services that implement workflow tasks.
Package protocol defines the public NATS request/reply contract between the Packtrail engine and the remote services that implement workflow tasks.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL