workflow

package
v0.2.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 29, 2025 License: MIT Imports: 3 Imported by: 0

README

Workflow

A minimal Go library for creating and executing workflows with conditional task transitions.

Features

  • Minimal API: Just 1 interface and 3 methods to learn
  • Conditional Branching: Route workflow execution based on task results
  • Context Support: Built-in cancellation and timeout support
  • State Sharing: Pass data between tasks using a shared state map
  • Loop Support: Create workflows with cycles and retry logic
  • Zero Dependencies: Uses only Go standard library

Installation

go get github.com/aktagon/llmkit/workflow

Quick Start

package main

import (
    "context"
    "fmt"
    "log"

    "github.com/aktagon/llmkit/workflow"
)

// Define a task
type GreetTask struct{}

func (t *GreetTask) Execute(ctx context.Context, state map[string]interface{}) (string, error) {
    name := state["name"].(string)
    state["message"] = fmt.Sprintf("Hello, %s!", name)
    return "default", nil
}

func main() {
    // Create workflow
    greet := &GreetTask{}
    wf := workflow.New(greet)

    // Execute workflow
    state := map[string]interface{}{"name": "World"}
    err := wf.Run(context.Background(), state)
    if err != nil {
        log.Fatal(err)
    }

    fmt.Println(state["message"]) // Output: Hello, World!
}

API Reference

Task Interface
type Task interface {
    Execute(ctx context.Context, state map[string]interface{}) (string, error)
}

The Task interface represents a single unit of work. The Execute method:

  • Receives a context.Context for cancellation and timeouts
  • Receives a state map for reading and writing shared data
  • Returns an action string that determines the next task
  • Returns an error if the task fails
Workflow Type
type Workflow struct {
    // private fields
}

The Workflow type orchestrates task execution with conditional transitions.

Constructor
func New(startTask Task) *Workflow

Creates a new workflow starting with the given task.

Methods
func (w *Workflow) On(from Task, action string, to Task) *Workflow

Adds a transition from one task to another based on the action returned by the source task. This method is chainable.

func (w *Workflow) Run(ctx context.Context, state map[string]interface{}) error

Executes the workflow starting from the start task. Continues until no more transitions are found or an error occurs.

Examples

Sequential Workflow
type Task1 struct{}
func (t *Task1) Execute(ctx context.Context, state map[string]interface{}) (string, error) {
    state["step"] = 1
    return "next", nil
}

type Task2 struct{}
func (t *Task2) Execute(ctx context.Context, state map[string]interface{}) (string, error) {
    state["step"] = 2
    return "default", nil
}

func main() {
    task1 := &Task1{}
    task2 := &Task2{}

    wf := workflow.New(task1).On(task1, "next", task2)

    state := make(map[string]interface{})
    err := wf.Run(context.Background(), state)
    // state["step"] == 2
}
Conditional Branching
type DecisionTask struct{}
func (t *DecisionTask) Execute(ctx context.Context, state map[string]interface{}) (string, error) {
    score := state["score"].(int)
    if score >= 80 {
        return "approved", nil
    }
    return "rejected", nil
}

type ApprovalTask struct{}
func (t *ApprovalTask) Execute(ctx context.Context, state map[string]interface{}) (string, error) {
    state["status"] = "approved"
    return "default", nil
}

type RejectionTask struct{}
func (t *RejectionTask) Execute(ctx context.Context, state map[string]interface{}) (string, error) {
    state["status"] = "rejected"
    return "default", nil
}

func main() {
    decision := &DecisionTask{}
    approval := &ApprovalTask{}
    rejection := &RejectionTask{}

    wf := workflow.New(decision).
        On(decision, "approved", approval).
        On(decision, "rejected", rejection)

    state := map[string]interface{}{"score": 85}
    err := wf.Run(context.Background(), state)
    // state["status"] == "approved"
}
Loop with Retry Logic
type ProcessTask struct{}
func (t *ProcessTask) Execute(ctx context.Context, state map[string]interface{}) (string, error) {
    attempts := state["attempts"].(int)
    attempts++
    state["attempts"] = attempts

    // Simulate success after 3 attempts
    if attempts >= 3 {
        state["result"] = "success"
        return "complete", nil
    }

    return "retry", nil
}

type RetryTask struct{}
func (t *RetryTask) Execute(ctx context.Context, state map[string]interface{}) (string, error) {
    // Add delay logic here if needed
    time.Sleep(100 * time.Millisecond)
    return "continue", nil
}

type CompleteTask struct{}
func (t *CompleteTask) Execute(ctx context.Context, state map[string]interface{}) (string, error) {
    state["status"] = "completed"
    return "default", nil
}

func main() {
    process := &ProcessTask{}
    retry := &RetryTask{}
    complete := &CompleteTask{}

    wf := workflow.New(process).
        On(process, "retry", retry).
        On(process, "complete", complete).
        On(retry, "continue", process) // Loop back

    state := map[string]interface{}{"attempts": 0}
    err := wf.Run(context.Background(), state)
    // state["result"] == "success"
}
Context Cancellation
func main() {
    // Create context with timeout
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()

    wf := workflow.New(longRunningTask)

    state := make(map[string]interface{})
    err := wf.Run(ctx, state)
    if err == context.DeadlineExceeded {
        fmt.Println("Workflow timed out")
    }
}

Patterns and Best Practices

1. State Management

Use descriptive keys and consistent types in the state map:

func (t *Task) Execute(ctx context.Context, state map[string]interface{}) (string, error) {
    // Good: descriptive keys
    userID := state["user_id"].(string)
    orderTotal := state["order_total"].(float64)

    // Set results with clear naming
    state["payment_status"] = "completed"
    state["transaction_id"] = "txn_12345"

    return "success", nil
}
2. Action Naming

Use clear, descriptive action names:

// Good: descriptive actions
return "payment_approved", nil
return "inventory_insufficient", nil
return "user_validation_failed", nil

// Avoid: generic actions
return "ok", nil
return "error", nil
return "next", nil
3. Error Handling

Handle errors explicitly and provide context:

func (t *PaymentTask) Execute(ctx context.Context, state map[string]interface{}) (string, error) {
    amount := state["amount"].(float64)

    if amount <= 0 {
        return "", fmt.Errorf("invalid amount: %f", amount)
    }

    // Process payment
    err := processPayment(amount)
    if err != nil {
        return "", fmt.Errorf("payment processing failed: %w", err)
    }

    return "payment_success", nil
}
4. Context Awareness

Always check context cancellation in long-running tasks:

func (t *LongTask) Execute(ctx context.Context, state map[string]interface{}) (string, error) {
    for i := 0; i < 1000; i++ {
        // Check for cancellation
        select {
        case <-ctx.Done():
            return "", ctx.Err()
        default:
        }

        // Do work
        processItem(i)
    }

    return "complete", nil
}
5. Task Composition

Keep tasks focused on a single responsibility:

// Good: focused tasks
type ValidateOrderTask struct{}
type CalculateTaxTask struct{}
type ProcessPaymentTask struct{}

// Avoid: monolithic tasks
type HandleOrderTask struct{} // Does too much
6. Testing

Test tasks in isolation and workflows end-to-end:

func TestValidateOrderTask(t *testing.T) {
    task := &ValidateOrderTask{}

    state := map[string]interface{}{
        "order_id": "12345",
        "items":    []string{"item1", "item2"},
    }

    action, err := task.Execute(context.Background(), state)

    assert.NoError(t, err)
    assert.Equal(t, "valid", action)
    assert.True(t, state["order_valid"].(bool))
}

Common Use Cases

Document Processing Pipeline
validate := &ValidateDocumentTask{}
extract := &ExtractDataTask{}
transform := &TransformDataTask{}
store := &StoreDataTask{}
errorHandler := &ErrorHandlerTask{}

wf := workflow.New(validate).
    On(validate, "valid", extract).
    On(validate, "invalid", errorHandler).
    On(extract, "success", transform).
    On(extract, "failed", errorHandler).
    On(transform, "success", store).
    On(transform, "failed", errorHandler)
User Registration Flow
validateEmail := &ValidateEmailTask{}
checkExists := &CheckUserExistsTask{}
createUser := &CreateUserTask{}
sendWelcome := &SendWelcomeEmailTask{}
handleError := &HandleErrorTask{}

wf := workflow.New(validateEmail).
    On(validateEmail, "valid", checkExists).
    On(validateEmail, "invalid", handleError).
    On(checkExists, "new_user", createUser).
    On(checkExists, "exists", handleError).
    On(createUser, "created", sendWelcome)
Order Processing System
checkInventory := &CheckInventoryTask{}
processPayment := &ProcessPaymentTask{}
fulfillOrder := &FulfillOrderTask{}
sendConfirmation := &SendConfirmationTask{}
handleBackorder := &HandleBackorderTask{}
handlePaymentError := &HandlePaymentErrorTask{}

wf := workflow.New(checkInventory).
    On(checkInventory, "in_stock", processPayment).
    On(checkInventory, "out_of_stock", handleBackorder).
    On(processPayment, "payment_success", fulfillOrder).
    On(processPayment, "payment_failed", handlePaymentError).
    On(fulfillOrder, "fulfilled", sendConfirmation)

Performance Considerations

  • Minimal Overhead: Each transition has minimal overhead (~1μs)
  • Memory Efficient: No unnecessary allocations during execution
  • Concurrent Safe: Multiple workflows can run concurrently
  • Context Aware: Respects cancellation and timeouts

Limitations

  • No Parallel Execution: Tasks execute sequentially
  • No Built-in Persistence: State is not automatically persisted
  • No Built-in Retry Logic: Must be implemented in tasks
  • No Built-in Logging: Must be added to individual tasks

Extensions

For advanced features, consider these patterns:

Retry Logic
type RetryableTask struct {
    maxRetries int
    task       Task
}

func (rt *RetryableTask) Execute(ctx context.Context, state map[string]interface{}) (string, error) {
    var err error
    for i := 0; i < rt.maxRetries; i++ {
        action, err := rt.task.Execute(ctx, state)
        if err == nil {
            return action, nil
        }

        // Wait before retry
        time.Sleep(time.Duration(i+1) * time.Second)
    }
    return "", err
}
Logging
type LoggingTask struct {
    task   Task
    logger *log.Logger
}

func (lt *LoggingTask) Execute(ctx context.Context, state map[string]interface{}) (string, error) {
    lt.logger.Printf("Executing task: %T", lt.task)

    start := time.Now()
    action, err := lt.task.Execute(ctx, state)
    duration := time.Since(start)

    if err != nil {
        lt.logger.Printf("Task failed after %v: %v", duration, err)
    } else {
        lt.logger.Printf("Task completed in %v, action: %s", duration, action)
    }

    return action, err
}

License

This package is part of the llmkit project and follows the same license terms.

Documentation

Overview

Package workflow provides a minimal API for creating and executing workflows consisting of tasks with conditional transitions.

A workflow is composed of tasks that implement the Task interface. Each task can read and modify shared state, and returns an action string that determines the next task to execute.

Basic usage:

type MyTask struct{}

func (t *MyTask) Execute(ctx context.Context, state map[string]interface{}) (string, error) {
	// Perform work
	state["result"] = "completed"
	return "success", nil
}

func main() {
	task := &MyTask{}
	wf := workflow.New(task)

	state := make(map[string]interface{})
	err := wf.Run(context.Background(), state)
	if err != nil {
		log.Fatal(err)
	}
}

For more examples and patterns, see the examples_test.go file and the README.md.

Package workflow provides a minimal API for creating and executing workflows consisting of tasks with conditional transitions.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Task

type Task interface {
	Execute(ctx context.Context, state map[string]interface{}) (string, error)
}

Task represents a single unit of work in a workflow. The Execute method receives a context for cancellation and a state map for data sharing. It returns an action string that determines the next task, and an error if the task fails.

type Workflow

type Workflow struct {
	// contains filtered or unexported fields
}

Workflow orchestrates tasks with conditional transitions based on actions returned by tasks.

func New

func New(startTask Task) *Workflow

New creates a new workflow starting with the given task.

func (*Workflow) On

func (w *Workflow) On(from Task, action string, to Task) *Workflow

On adds a transition from one task to another based on the action returned by the from task. This method is chainable for convenience.

func (*Workflow) Run

func (w *Workflow) Run(ctx context.Context, state map[string]interface{}) error

Run executes the workflow starting from the start task. It continues executing tasks based on the actions they return until no more transitions are found. The workflow can be cancelled using the context.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL