ebind

module
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 25, 2026 License: MIT

README

ebind

CI CodeQL Go Reference Go Report Card Latest release

A Go task queue + DAG workflow engine built on NATS JetStream.

Function-first ergonomics (Register(reg, MyFunc), Enqueue(c, MyFunc, args...), Await[T](ctx, fut)) on top of a persistent queue with retries, dead-lettering, and optional DAG orchestration — all driven by a single NATS dependency that can run as an embedded in-process server (including 3-node HA cluster) or against an external JetStream deployment.

Why ebind

  • NATS-native. No Redis, no Postgres. If you already run NATS you already have ebind's dependencies.
  • Single-binary HA. The embed package boots a 3-node JetStream cluster inside your process. One binary per machine, cluster.routes wired automatically.
  • Function-first API. Pass your function reference, not a string name and a JSON schema. Reflection introspects the signature; runtime arg-type validation happens before publish.
  • Typed responses. Await[Profile](ctx, fut) returns a typed value, not interface{}.
  • Durable DAG workflows. Declare dependencies between steps; state lives in a NATS KV bucket so workflows survive producer restarts. Mandatory/optional steps, per-step retry policies, dynamic step addition from within handlers.

Install

go get github.com/f1bonacc1/ebind

Requires Go 1.22+ and NATS JetStream 2.8+.

Quickstart — standalone task queue

package main

import (
    "context"
    "fmt"
    "log"

    "github.com/nats-io/nats.go"
    "github.com/nats-io/nats.go/jetstream"

    "github.com/f1bonacc1/ebind/client"
    "github.com/f1bonacc1/ebind/embed"
    "github.com/f1bonacc1/ebind/stream"
    "github.com/f1bonacc1/ebind/task"
    "github.com/f1bonacc1/ebind/worker"
)

// Any Go function with (context.Context, ...args) (T, error) or (context.Context, ...args) error.
func SendEmail(ctx context.Context, to, subject, body string) (string, error) {
    // ... actually send ...
    return "msg-id-42", nil
}

func main() {
    ctx := context.Background()

    // 1. Start an embedded NATS JetStream (dev). In prod, point at an external cluster
    //    or use embed.StartCluster(embed.ClusterConfig{Size: 3, ...}) for in-process HA.
    node, _ := embed.StartNode(embed.NodeConfig{Port: -1, StoreDir: "/tmp/ebind-demo"})
    defer node.Shutdown()
    nc, _ := nats.Connect(node.ClientURL())

    // 2. Create the ebind streams (TASKS, RESP, DLQ).
    js, _ := jetstream.New(nc)
    _ = stream.EnsureStreams(ctx, js, stream.Config{Replicas: 1})

    // 3. Register handlers + start worker.
    reg := task.NewRegistry()
    task.MustRegister(reg, SendEmail)

    w, _ := worker.New(nc, reg, worker.Options{Concurrency: 16})
    go w.Run(ctx)

    // 4. Enqueue from anywhere in the cluster — producer and worker can be different binaries.
    c, _ := client.New(ctx, nc, client.Options{})
    defer c.Close()

    fut, _ := client.Enqueue(c, SendEmail, "alice@example.com", "hello", "world")
    msgID, err := client.Await[string](ctx, fut)
    if err != nil {
        log.Fatal(err)
    }
    fmt.Println("sent:", msgID)
}

Run the bundled end-to-end demo:

make demo

Quickstart — DAG workflow

import (
    "github.com/f1bonacc1/ebind/task"
    "github.com/f1bonacc1/ebind/workflow"
)

// Handlers are plain functions.
func FetchUser(ctx context.Context, id string) (User, error)        { /* ... */ }
func Enrich(ctx context.Context, id string) (Enriched, error)       { /* ... */ }
func Combine(ctx context.Context, u User, e Enriched) (Profile, error)

// In main(): after EnsureStreams + worker started, wire the workflow layer.
wf, _ := workflow.NewFromNATS(ctx, nc, 1 /* replicas */)
task.MustRegister(reg, FetchUser)
task.MustRegister(reg, Enrich)
task.MustRegister(reg, Combine)

// Attach the hook + middleware so the worker talks to the workflow layer.
w, _ := worker.New(nc, reg, worker.Options{
    StepHook:   wf.Hook(),
    Middleware: []worker.Middleware{wf.ContextMiddleware()},
})
go w.Run(ctx)
go wf.RunScheduler(ctx)

// Build + submit a DAG.
dag := workflow.New()
a := dag.Step("fetch",    FetchUser, userID)
b := dag.StepOpts("enrich", Enrich, []workflow.StepOption{workflow.Optional()}, userID)
c := dag.Step("combine",  Combine, a.Ref(), b.RefOrDefault(Enriched{}))

_ = dag.Submit(ctx, wf)
profile, err := workflow.Await[Profile](ctx, wf, dag.ID(), c)

Key behaviors:

  • a.Ref()combine runs only if fetch succeeds; cascade-skips otherwise.
  • b.RefOrDefault(v)combine runs with v substituted if enrich fails or is skipped.
  • workflow.Optional()enrich's failure does not fail the DAG.
  • workflow.WithRetry(policy) — per-DAG default retry; workflow.WithStepRetry(policy) overrides per-step.
  • From inside a handler, workflow.FromContext(ctx).Step(...) adds more steps dynamically.
Resuming Await from another instance

DAG state + step results live in NATS KV. Workers keep running independently of whoever called Await. If the waiting process dies, the DAG continues; results land in KV and stay there. A different process (same NATS cluster) can resume the wait with only the DAG and step IDs:

// Instance A — submitter. Persist these two strings somewhere (DB, Redis, file).
dagID  := dag.ID()
stepID := c.ID()            // the *Step you'd pass to Await
_ = dag.Submit(ctx, wf)
// ... instance A may exit now ...

// Instance B — resumer. No *Step handle needed.
wfB, _  := workflow.NewFromNATS(ctx, nc, 1)
result, err := workflow.AwaitByID[Profile](ctx, wfB, dagID, stepID)

AwaitByID uses NATS KV IncludeHistory() under the hood, so late subscribers still receive results that were written before they started watching. See examples/11-workflow-resume for a runnable two-invocation demo.

Architecture

┌──────────────┐        publish TASKS.<name>         ┌──────────────┐
│   Producer   │ ───────────────────────────────────▶│ NATS JetStrm │
│              │                                     │  TASKS       │
│ client.Enq   │                                     └──────┬───────┘
└──────┬───────┘                                            │ pull
       │                                                    ▼
       │ subscribe RESP.<client_id>.>     ┌──────────────────────┐
       │◀──────────────── RESP ───────────│      Worker(s)       │
       │                                  │  - reflect.Call      │
       ▼                                  │  - middleware chain  │
  Future.Get() / Await[T]                 │  - StepHook          │
                                          └──────┬───────────────┘
                                                 │ DAG events
                                                 ▼
                                    ┌────────────────────────────┐
                                    │  Scheduler (every worker,  │
                                    │   leader-gated)            │
                                    │  - state in NATS KV        │
                                    │  - resync on acquire       │
                                    └────────────────────────────┘
  • task.Registry — name → reflect.Value map; Register(fn) introspects signature.
  • client.Client — one response-consumer per client; routes responses to typed Futures.
  • worker.Worker — pull consumer + middleware chain (Recover, Log, user) + per-task retry policy.
  • embed.StartCluster(3) — in-process 3-node JetStream cluster with loopback routes.
  • workflow — DAG builder + persistent state (KV bucket ebind-dags) + event-driven scheduler with leader-elector-gated sweep for stranded recovery.

See CLAUDE.md for the full architectural walk-through.

Production concerns

Concern Handled by
At-least-once delivery JetStream AckExplicitPolicy + MaxDeliver
Exactly-one enqueue on retries JetStream Nats-Msg-Id dedupe with 5-min window
State consistency NATS KV Update(key, val, expectedRev) CAS
Handler panics worker.Recover middleware → TaskError{Kind: "panic"}
Retry control task.RetryPolicy on envelope (per-task) or worker.Options.MaxDeliver default
Non-retryable errors RetryPolicy.NonRetryableErrorKinds OR TaskError{Retryable: false}
Dead-lettering dlq.Publish auto-called on final failure → EBIND_DLQ stream
Graceful shutdown worker.Run(ctx) drains in-flight on ctx-cancel (configurable grace)
HA 3-node embedded cluster with Replicas: 3 streams & KV
Stranded DAG recovery Scheduler sweep on LeaderElector false→true edge

Package layout

task/         envelope + registry + RetryPolicy
client/       Enqueue + Future + Await[T]
worker/       consume loop + middleware + StepHook
stream/       JetStream stream setup
dlq/          dead-letter publishing
embed/        in-process NATS server (single + cluster)
workflow/     DAG builder + scheduler + KV-backed state
internal/testutil/  harness for integration tests
cmd/demo/     single-process end-to-end demo

Development

make help          # list all targets
make build         # compile everything
make test          # run all tests with -race
make test-count    # 3× runs, catch flakes
make lint          # golangci-lint
make cover         # HTML coverage report
make demo          # run cmd/demo end-to-end

Status

  • v1 (task queue): done — retries, DLQ, middleware, embedded HA cluster.
  • v2 (DAG workflows): done — optional steps, retry policies, dynamic DAGs.
  • v2.1 (stranded recovery): done — leader-acquisition sweep.
  • v2+ (future): phantom-Running detection, cross-DAG signals, saga/compensation.

License

MIT

Directories

Path Synopsis
Package client is the producer side: Enqueue tasks and await responses.
Package client is the producer side: Enqueue tasks and await responses.
cmd
demo command
cmd/demo: single-process end-to-end round-trip.
cmd/demo: single-process end-to-end round-trip.
ebctl command
Command ebctl is the operator CLI for ebind: inspect and manipulate DAGs, streams, consumers, and the DLQ of a running NATS JetStream deployment.
Command ebctl is the operator CLI for ebind: inspect and manipulate DAGs, streams, consumers, and the DLQ of a running NATS JetStream deployment.
ebctl-playground command
Command ebctl-playground runs a long-lived ebind deployment that exercises every code path ebctl cares about: parallel DAG branches, retries, DLQ entries, cascade-skips, cancellations.
Command ebctl-playground runs a long-lived ebind deployment that exercises every code path ebctl cares about: parallel DAG branches, retries, DLQ entries, cascade-skips, cancellations.
ebctl/internal/cli
Package cli holds the shared CLI context, global flags, and wiring helpers used by every ebctl subcommand.
Package cli holds the shared CLI context, global flags, and wiring helpers used by every ebctl subcommand.
ebctl/internal/commands/dag
Package dag implements `ebctl dag ...` subcommands.
Package dag implements `ebctl dag ...` subcommands.
ebctl/internal/commands/dlq
Package dlq implements `ebctl dlq ...` subcommands.
Package dlq implements `ebctl dlq ...` subcommands.
ebctl/internal/commands/stream
Package stream implements `ebctl stream ...` and `ebctl consumer ...`.
Package stream implements `ebctl stream ...` and `ebctl consumer ...`.
ebctl/internal/commands/version
Package version prints the ebctl build metadata.
Package version prints the ebctl build metadata.
ebctl/internal/format
Package format is the tiny output abstraction used by ebctl commands.
Package format is the tiny output abstraction used by ebctl commands.
Package dlq publishes poison tasks (those that exhausted MaxDeliver or failed non-retryably) to the DLQ stream for operator review.
Package dlq publishes poison tasks (those that exhausted MaxDeliver or failed non-retryably) to the DLQ stream for operator review.
Package embed starts and supervises NATS JetStream servers in-process.
Package embed starts and supervises NATS JetStream servers in-process.
examples
01-basic command
Basic ebind usage: register one handler, enqueue a task, await the typed result.
Basic ebind usage: register one handler, enqueue a task, await the typed result.
02-retry-policy command
Retry policy: a task with a RetryPolicy attached to it.
Retry policy: a task with a RetryPolicy attached to it.
03-fire-and-forget command
Fire-and-forget: EnqueueAsync returns as soon as the task is published.
Fire-and-forget: EnqueueAsync returns as soon as the task is published.
04-cluster-ha command
HA cluster: start 3 NATS JetStream nodes in-process with replicated streams.
HA cluster: start 3 NATS JetStream nodes in-process with replicated streams.
05-middleware command
Custom middleware: a timing wrapper that logs every handler's duration.
Custom middleware: a timing wrapper that logs every handler's duration.
06-workflow-linear command
Linear DAG workflow: A → B → C. Each step's output feeds the next.
Linear DAG workflow: A → B → C. Each step's output feeds the next.
07-workflow-fanout command
Fan-out / fan-in: kick off two independent steps in parallel, then combine.
Fan-out / fan-in: kick off two independent steps in parallel, then combine.
08-workflow-optional command
Optional steps + RefOrDefault: a step is marked Optional so its failure doesn't fail the DAG.
Optional steps + RefOrDefault: a step is marked Optional so its failure doesn't fail the DAG.
09-workflow-dynamic command
Dynamic DAG: a handler inspects its input and adds more steps to the currently-running DAG via workflow.FromContext(ctx).Step(...).
Dynamic DAG: a handler inspects its input and adds more steps to the currently-running DAG via workflow.FromContext(ctx).Step(...).
10-workflow-temporal-deps command
Time-only dependencies: After() and AfterAny().
Time-only dependencies: After() and AfterAny().
11-workflow-resume command
Cross-instance resume: a second instance picks up waiting on a DAG that a first instance submitted and then "crashed" (the example simulates this by exiting).
Cross-instance resume: a second instance picks up waiting on a DAG that a first instance submitted and then "crashed" (the example simulates this by exiting).
internal
testutil
Package testutil provides test helpers: embedded NATS + streams + worker.
Package testutil provides test helpers: embedded NATS + streams + worker.
Package stream creates and manages ebind's JetStream streams.
Package stream creates and manages ebind's JetStream streams.
Package worker consumes tasks from the TASKS stream, dispatches via the registry, and publishes responses to each task's ReplyTo subject.
Package worker consumes tasks from the TASKS stream, dispatches via the registry, and publishes responses to each task's ReplyTo subject.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL