wasman

package module
v0.0.23 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 10, 2026 License: MIT Imports: 29 Imported by: 0

README

Wasman: Durable WebAssembly (WASM) Execution Engine

A robust, highly-reusable, and lightweight Durable Execution Engine built on Go, WebAssembly (WASM), and the wazero runtime. It provides fault-tolerant, stateful execution of custom business logic inside a secure sandbox with automatic memory snapshotting, failure recovery, and memory-efficient streaming, completely free of CGO and glibc dependencies.


The Durable Execution Philosophy

Modern distributed architectures often require executing long-running or multi-step business logic (such as workflows, integrations, and orchestrations) that must survive infrastructure failures. Traditional approaches rely on complex database state machines or heavy external orchestrators.

Wasman addresses this by leveraging WebAssembly's sandboxed linear memory:

  1. Host-Guest Isolation: The guest business logic is compiled into a .wasm module (compiled via TinyGo/Go) and executed inside the pure-Go wazero runtime.
  2. Stateless Host, Stateful Storage: The execution host runs the virtual machine sandboxes. It remains stateless. All state (linear memory snapshots, execution logs) is persisted in S3 or local file snapshot stores.
  3. Black Box API: Developers using the platform interact only with high-level client APIs (generated via Protobuf/GRPC or client SDKs). The underlying complexity of WebAssembly, snapshotting, and transaction control is entirely hidden.
       [ Client / API Request ] (StartProcess / CompleteTask)
                 │
                 ▼
       ┌──────────────────┐
       │  WorkflowEngine  │ (Host Orchestrator)
       └─────────┬────────┘
                 │
      ┌──────────┴──────────┐
      ▼                     ▼
┌───────────┐         ┌───────────┐
│ bpmn_vm   │ (WASM)  │  worker   │ (WASM Business Logic)
│ Interpreter         │  Executor │
└─────┬─────┘         └─────┬─────┘
      │                     │
      └──────────┬──────────┘
                 │ (State & Memory Checkpoints)
                 ▼
       ┌──────────────────┐
       │  Snapshot Store  │ (Gzip-compressed Snapshots & Deltas)
       └─────────┬────────┘
                 │
        ┌────────┴────────┐
        ▼                 ▼
   [ S3 Storage ]   [ File Storage ]

Technical Features & Architectural Design

1. Strict Storage Compression

Checkpointing large WebAssembly modules generates snapshots of their linear memory (typically multiples of 64KB pages). To prevent S3/disk space bloat under high throughput:

  • Gzip Compression: Snapshots, page deltas, and oplogs are transparently compressed using the standard gzip format.
  • Strict Format Enforcement: All reads enforce the presence of gzip compression. Raw uncompressed snapshots are not supported, ensuring consistent storage compression benefits across all process states.
2. $O(1)$ RAM Stream-first I/O

For high-performance data processing (e.g., streaming files, large JSON/CSV payloads):

  • Data is transferred directly to/from WASM linear memory in chunks using stream buffers.
  • This guarantees constant memory footprint ($O(1)$ RAM) regardless of payload size, avoiding heap exhaustion and high GC pause times.
  • All communications are executed fully in-memory via user-provided download/upload stream handlers, entirely avoiding network loopbacks and TCP port exposures.
3. Page-Level Delta Snapshots

Instead of writing a full multi-megabyte memory snapshot on every single checkpoint:

  • Hashing: Wasman uses FNV-64a to hash individual 64KB memory pages.
  • Deltas: On subsequent checkpoints, it only writes pages that have actually been modified (dirty pages), drastically reducing I/O latency.
4. Optimistic Concurrency Control (OCC)

In high-concurrency environments where multiple orchestrator nodes might receive step execution triggers for the same process instance:

  • S3 ETag Headers: The S3 storage client uses native HTTP If-Match headers.
  • State Integrity: If another node has updated the snapshot in the meantime, the write fails with an OCC conflict, preventing state corruption.

Defeated Corner Cases (Failure & Crash Recovery)

Wasman guarantees durable execution by checkpointing and restoring state across node crashes:

Scenario: Server Crash During Execution
  1. Before Step: The VM starts executing a process. It hits a checkpoint (e.g., before an external API call or a User Task wait state).
  2. Checkpointing:
    • The engine halts execution.
    • It captures the current state, writing a Full Snapshot or Delta Snapshot to S3.
    • It logs the expected step transition.
  3. Crash: The host server crashes (e.g., hardware failure, OOM, or manual redeployment).
  4. Resumption:
    • Another node receives the request to resume.
    • It reads the metadata, loads the compiled WASM binary, and pulls the compressed snapshot.
    • It restores the linear memory of the WASM VM to the exact page-level state of the last checkpoint.
    • It replays the execution logs (Oplog) to restore transient state and resumes execution seamlessly.

Directory Structure

  • wasman.go: WASM compilation, runtime setup, and engine execution loops.
  • compress.go: Transparent Gzip compression utilities.
  • fs_store.go: Local file-system snapshot store with optional compression.
  • s3_store.go: S3-compatible object snapshot store with OCC.
  • types.go: Common structures, interfaces, configurations, and error mappings.
  • examples/:
    • process-csv/: High-throughput CSV mapping with simulated crash recovery and $O(1)$ RAM usage.
    • camunda/: Integration with Camunda 7 External Tasks.
    • temporal/: CRM/Math activities in a simulated Temporal environment.
    • gotenberg-telegram/: Streaming PDF generation bot integration.
    • s3-store/: Direct S3/MinIO snapshotting baseline demonstration.
    • in-memory-channel/: Purely in-memory host-guest stream data exchange bypassing TCP loopbacks entirely.
    • safe-task/: Execution of sandboxed tasks utilizing the safe, high-level RunTask runner utility.
    • wasm-inspector/: Low-level WebAssembly inspect utility executing guest WASM binaries under customized WASI settings.

Getting Started

Running Tests

To run unit and integration tests for the core engine:

go test -v .
Running the CSV Crash Demonstration

The process-csv example demonstrates a complete crash-and-restore cycle:

  1. Compile the WASM worker:
    make build-worker
    
  2. Run the CSV pipeline:
    make run-csv-example
    

This will:

  • Start a mock HTTP server.
  • Initiate execution of the CSV pipeline.
  • Simulate a host crash on the first checkpoint.
  • Verify the compressed snapshot is written to disk.
  • Restore the memory from the snapshot and complete the execution successfully.

API Usage Example

package main

import (
	"context"
	"fmt"
	"github.com/nativebpm/connectors/wasman"
)

func main() {
	// 1. Initialize snapshot store with compression enabled
	store := &wasman.FileSnapshotStore{
		Dir:         "snapshots",
		Compression: true,
	}

	// 2. Define stream handlers
	downloadHandler := func() ([]byte, error) {
		return []byte("my input data stream"), nil
	}
	uploadHandler := func(payload []byte) error {
		fmt.Printf("Received output payload: %s\n", string(payload))
		return nil
	}

	// 3. Execute session using the high-level Fluent Runner API.
	// If a snapshot exists under this session ID, memory is restored automatically.
	crashed, err := wasman.NewRunner().
		WithWasmPath("worker.wasm").
		WithStore(store).
		WithSessionID("my-session-id").
		WithEntrypoint("run").
		WithDownloadHandler(downloadHandler).
		WithUploadHandler(uploadHandler).
		Run()

	if err != nil {
		if crashed {
			fmt.Println("Execution suspended at checkpoint.")
		} else {
			fmt.Printf("Execution failed: %v\n", err)
		}
	} else {
		fmt.Println("Execution completed successfully!")
	}
}

Performance & Benchmarks

Detailed CPU and memory benchmark profiles (focusing on the optimized warm resume JIT execution performance of ~38 µs) are available in the Benchmarks & Profiling Profile document.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrWasmVersionMismatch = fmt.Errorf("wasm module hash mismatch")
	ErrConcurrentExecution = fmt.Errorf("concurrent execution detected (OCC fencing)")
)

Functions

func GetTenantID added in v0.0.23

func GetTenantID(ctx context.Context) string

GetTenantID extracts the tenant ID from the context, defaulting to "default".

func WithApiHandler added in v0.0.18

func WithApiHandler(ctx context.Context, h func(apiName string, request []byte) ([]byte, error)) context.Context

WithApiHandler binds an in-memory host_call_api handler to context.

func WithDownloadHandler added in v0.0.18

func WithDownloadHandler(ctx context.Context, h func() ([]byte, error)) context.Context

WithDownloadHandler binds an in-memory stream download handler to context.

func WithKeepAlive added in v0.0.15

func WithKeepAlive(ctx context.Context) context.Context

func WithSession added in v0.0.6

func WithSession(ctx context.Context, s *Session) context.Context

func WithTenantID added in v0.0.23

func WithTenantID(ctx context.Context, tenantID string) context.Context

WithTenantID binds the tenant ID to the context.

func WithUploadHandler added in v0.0.18

func WithUploadHandler(ctx context.Context, h func(payload []byte) error) context.Context

WithUploadHandler binds an in-memory stream upload handler to context.

Types

type Engine

type Engine struct {
	// contains filtered or unexported fields
}

Engine coordinates execution, compilation, and snapshotting of WASM modules.

func NewEngine

func NewEngine(wasmPath string, store SnapshotStore, opts ...EngineOption) (*Engine, error)

NewEngine creates a new reusable WASM Durable Execution Engine from a WASM module file path.

func NewEngineWithBytes added in v0.0.7

func NewEngineWithBytes(wasmBytes []byte, store SnapshotStore, opts ...EngineOption) (*Engine, error)

NewEngineWithBytes creates a new reusable WASM Durable Execution Engine directly from in-memory WASM bytes.

func (*Engine) CloseInstance added in v0.0.15

func (e *Engine) CloseInstance(ctx context.Context, instanceID string) error

CloseInstance forcefully closes any active in-memory module instance for the given ID.

func (*Engine) Execute

func (e *Engine) Execute(ctx context.Context, instanceID string, entrypoint string, serverAddr string, shouldCrash bool) (bool, error)

Execute runs the WASM instance with a given entrypoint and session context. If it finds a saved snapshot, it automatically restores the linear memory.

func (*Engine) ExecuteWithArgs added in v0.0.11

func (e *Engine) ExecuteWithArgs(ctx context.Context, instanceID string, entrypoint string, serverAddr string, shouldCrash bool, params ...uint64) (bool, error)

ExecuteWithArgs runs the WASM instance with a given entrypoint, session context, and variadic parameters.

func (*Engine) RunBPMN added in v0.0.11

func (e *Engine) RunBPMN(
	ctx context.Context,
	instanceID string,
	entrypoint string,
	graphBytes []byte,
	inputBytes []byte,
	completedTaskID string,
	serverAddr string,
) (bool, []byte, error)

RunBPMN executes the BPMN VM core with the given inputs. It handles instantiation, memory restoration, input writing, execution, and output reading. It returns: crashed (bool), outputBytes ([]byte), error.

func (*Engine) Store added in v0.0.3

func (e *Engine) Store() SnapshotStore

Store returns the SnapshotStore associated with the Engine.

type EngineOption

type EngineOption func(*Engine)

EngineOption defines a configuration option for the Engine.

type FileSnapshotStore

type FileSnapshotStore struct {
	Dir string
	// contains filtered or unexported fields
}

FileSnapshotStore implements SnapshotStore using the local file system.

func (*FileSnapshotStore) Delete

func (f *FileSnapshotStore) Delete(ctx context.Context, id string) error

func (*FileSnapshotStore) Load

func (f *FileSnapshotStore) Load(ctx context.Context, id string) ([]byte, error)

Load reads a full memory snapshot from a file.

func (*FileSnapshotStore) LoadActiveIndex added in v0.0.3

func (f *FileSnapshotStore) LoadActiveIndex(ctx context.Context) ([]byte, error)

LoadActiveIndex loads the local active index file.

func (*FileSnapshotStore) LoadDeltas

func (f *FileSnapshotStore) LoadDeltas(ctx context.Context, id string) (map[int][]byte, error)

func (*FileSnapshotStore) LoadMetadata

func (f *FileSnapshotStore) LoadMetadata(ctx context.Context, id string) (*InstanceMeta, error)

func (*FileSnapshotStore) LoadOplog

func (f *FileSnapshotStore) LoadOplog(ctx context.Context, id string) ([]OplogEntry, error)

func (*FileSnapshotStore) LoadWasm

func (f *FileSnapshotStore) LoadWasm(ctx context.Context, hash string) ([]byte, error)

func (*FileSnapshotStore) Save

func (f *FileSnapshotStore) Save(ctx context.Context, id string, snapshot []byte) error

Save writes a full memory snapshot to a file.

func (*FileSnapshotStore) SaveDeltas

func (f *FileSnapshotStore) SaveDeltas(ctx context.Context, id string, deltas map[int][]byte) error

func (*FileSnapshotStore) SaveMetadata

func (f *FileSnapshotStore) SaveMetadata(ctx context.Context, meta *InstanceMeta) (bool, error)

func (*FileSnapshotStore) SaveOplog

func (f *FileSnapshotStore) SaveOplog(ctx context.Context, id string, callIndex int, apiName string, request []byte, response []byte) error

func (*FileSnapshotStore) SaveWasm

func (f *FileSnapshotStore) SaveWasm(ctx context.Context, hash string, wasmBytes []byte) error

func (*FileSnapshotStore) TruncateDeltas

func (f *FileSnapshotStore) TruncateDeltas(ctx context.Context, id string) error

func (*FileSnapshotStore) TruncateOplog

func (f *FileSnapshotStore) TruncateOplog(ctx context.Context, id string, beforeCallIndex int) error

func (*FileSnapshotStore) UpdateActiveIndex added in v0.0.3

func (f *FileSnapshotStore) UpdateActiveIndex(ctx context.Context, id string, info []byte, completed bool) error

UpdateActiveIndex updates the local index file.

type InstanceMeta

type InstanceMeta struct {
	InstanceID     string `json:"instance_id"`
	WasmHash       string `json:"wasm_hash"`
	Version        int    `json:"version"`
	ETag           string `json:"etag,omitempty"`
	ProcessID      string `json:"process_id,omitempty"`
	DefinitionHash string `json:"definition_hash,omitempty"`
	BusinessKey    string `json:"business_key,omitempty"`
	BpmnState      []byte `json:"bpmn_state,omitempty"`
	Completed      bool   `json:"completed,omitempty"`
}

InstanceMeta holds execution metadata for safety checks and OCC.

type MemorySnapshotStore added in v0.0.7

type MemorySnapshotStore struct {
	// contains filtered or unexported fields
}

MemorySnapshotStore implements SnapshotStore in memory.

func NewMemorySnapshotStore added in v0.0.7

func NewMemorySnapshotStore() *MemorySnapshotStore

NewMemorySnapshotStore creates a new MemorySnapshotStore.

func (*MemorySnapshotStore) Delete added in v0.0.7

func (s *MemorySnapshotStore) Delete(ctx context.Context, id string) error

func (*MemorySnapshotStore) Load added in v0.0.7

func (s *MemorySnapshotStore) Load(ctx context.Context, id string) ([]byte, error)

func (*MemorySnapshotStore) LoadActiveIndex added in v0.0.7

func (s *MemorySnapshotStore) LoadActiveIndex(ctx context.Context) ([]byte, error)

func (*MemorySnapshotStore) LoadDeltas added in v0.0.7

func (s *MemorySnapshotStore) LoadDeltas(ctx context.Context, id string) (map[int][]byte, error)

func (*MemorySnapshotStore) LoadMetadata added in v0.0.7

func (s *MemorySnapshotStore) LoadMetadata(ctx context.Context, id string) (*InstanceMeta, error)

func (*MemorySnapshotStore) LoadOplog added in v0.0.7

func (s *MemorySnapshotStore) LoadOplog(ctx context.Context, id string) ([]OplogEntry, error)

func (*MemorySnapshotStore) LoadWasm added in v0.0.7

func (s *MemorySnapshotStore) LoadWasm(ctx context.Context, hash string) ([]byte, error)

func (*MemorySnapshotStore) Save added in v0.0.7

func (s *MemorySnapshotStore) Save(ctx context.Context, id string, snapshot []byte) error

func (*MemorySnapshotStore) SaveDeltas added in v0.0.7

func (s *MemorySnapshotStore) SaveDeltas(ctx context.Context, id string, deltas map[int][]byte) error

func (*MemorySnapshotStore) SaveMetadata added in v0.0.7

func (s *MemorySnapshotStore) SaveMetadata(ctx context.Context, meta *InstanceMeta) (bool, error)

func (*MemorySnapshotStore) SaveOplog added in v0.0.7

func (s *MemorySnapshotStore) SaveOplog(ctx context.Context, id string, callIndex int, apiName string, request []byte, response []byte) error

func (*MemorySnapshotStore) SaveWasm added in v0.0.7

func (s *MemorySnapshotStore) SaveWasm(ctx context.Context, hash string, wasmBytes []byte) error

func (*MemorySnapshotStore) TruncateDeltas added in v0.0.7

func (s *MemorySnapshotStore) TruncateDeltas(ctx context.Context, id string) error

func (*MemorySnapshotStore) TruncateOplog added in v0.0.7

func (s *MemorySnapshotStore) TruncateOplog(ctx context.Context, id string, beforeCallIndex int) error

func (*MemorySnapshotStore) UpdateActiveIndex added in v0.0.7

func (s *MemorySnapshotStore) UpdateActiveIndex(ctx context.Context, id string, info []byte, completed bool) error

type OplogEntry

type OplogEntry struct {
	CallIndex       int    `json:"call_index"`
	ApiName         string `json:"api_name"`
	RequestPayload  []byte `json:"request_payload"`
	ResponsePayload []byte `json:"response_payload"`
}

OplogEntry represents a single external call log.

type PostgresSnapshotStore added in v0.0.22

type PostgresSnapshotStore struct {
	// contains filtered or unexported fields
}

PostgresSnapshotStore implements SnapshotStore using a PostgreSQL database.

func NewPostgresSnapshotStore added in v0.0.22

func NewPostgresSnapshotStore(db *sql.DB, masterKey string) (*PostgresSnapshotStore, error)

NewPostgresSnapshotStore initializes a new PostgreSQL-native SnapshotStore.

func (*PostgresSnapshotStore) Delete added in v0.0.22

func (s *PostgresSnapshotStore) Delete(ctx context.Context, id string) error

Delete removes all data associated with the instance from the database snapshot tables.

func (*PostgresSnapshotStore) Load added in v0.0.22

func (s *PostgresSnapshotStore) Load(ctx context.Context, id string) ([]byte, error)

Load reads a full memory snapshot from the database.

func (*PostgresSnapshotStore) LoadActiveIndex added in v0.0.22

func (s *PostgresSnapshotStore) LoadActiveIndex(ctx context.Context) ([]byte, error)

LoadActiveIndex compiles the active index list from the database.

func (*PostgresSnapshotStore) LoadDeltas added in v0.0.22

func (s *PostgresSnapshotStore) LoadDeltas(ctx context.Context, id string) (map[int][]byte, error)

LoadDeltas retrieves memory deltas from the database.

func (*PostgresSnapshotStore) LoadMetadata added in v0.0.22

func (s *PostgresSnapshotStore) LoadMetadata(ctx context.Context, id string) (*InstanceMeta, error)

LoadMetadata retrieves the instance metadata from the database.

func (*PostgresSnapshotStore) LoadOplog added in v0.0.22

func (s *PostgresSnapshotStore) LoadOplog(ctx context.Context, id string) ([]OplogEntry, error)

LoadOplog retrieves the oplog entries from the database.

func (*PostgresSnapshotStore) LoadWasm added in v0.0.22

func (s *PostgresSnapshotStore) LoadWasm(ctx context.Context, hash string) ([]byte, error)

LoadWasm loads a compiled WASM module binary from the database.

func (*PostgresSnapshotStore) Save added in v0.0.22

func (s *PostgresSnapshotStore) Save(ctx context.Context, id string, snapshot []byte) error

Save writes a full memory snapshot to the database.

func (*PostgresSnapshotStore) SaveDeltas added in v0.0.22

func (s *PostgresSnapshotStore) SaveDeltas(ctx context.Context, id string, deltas map[int][]byte) error

SaveDeltas saves memory deltas to the database.

func (*PostgresSnapshotStore) SaveMetadata added in v0.0.22

func (s *PostgresSnapshotStore) SaveMetadata(ctx context.Context, meta *InstanceMeta) (bool, error)

SaveMetadata saves metadata or atomically updates version via CAS.

func (*PostgresSnapshotStore) SaveOplog added in v0.0.22

func (s *PostgresSnapshotStore) SaveOplog(ctx context.Context, id string, callIndex int, apiName string, request []byte, response []byte) error

SaveOplog appends an API call to the database oplog table.

func (*PostgresSnapshotStore) SaveWasm added in v0.0.22

func (s *PostgresSnapshotStore) SaveWasm(ctx context.Context, hash string, wasmBytes []byte) error

SaveWasm saves a compiled WASM module binary to the database.

func (*PostgresSnapshotStore) TruncateDeltas added in v0.0.22

func (s *PostgresSnapshotStore) TruncateDeltas(ctx context.Context, id string) error

TruncateDeltas deletes memory deltas for the instance from the database.

func (*PostgresSnapshotStore) TruncateOplog added in v0.0.22

func (s *PostgresSnapshotStore) TruncateOplog(ctx context.Context, id string, beforeCallIndex int) error

TruncateOplog deletes oplog entries at or below the given call index.

func (*PostgresSnapshotStore) UpdateActiveIndex added in v0.0.22

func (s *PostgresSnapshotStore) UpdateActiveIndex(ctx context.Context, id string, info []byte, completed bool) error

UpdateActiveIndex updates the active instance status in the database.

type Runner added in v0.0.19

type Runner struct {
	// contains filtered or unexported fields
}

Runner provides a fully fluent API for setting up and executing a WASM module instance. It integrates memory/file snapshot store creation, engine loading, session configuration, and execution into a single chained workflow with sticky error tracking.

func NewRunner added in v0.0.19

func NewRunner() *Runner

NewRunner creates a new Runner with default context.

func (*Runner) Error added in v0.0.19

func (r *Runner) Error() error

Error returns any sticky error accumulated during chaining.

func (*Runner) Run added in v0.0.19

func (r *Runner) Run() (crashed bool, err error)

Run compiles the WASM module (if needed), configures the session, runs the execution, and returns whether it crashed and any error.

func (*Runner) WithApiHandler added in v0.0.19

func (r *Runner) WithApiHandler(handler func(apiName string, request []byte) ([]byte, error)) *Runner

WithApiHandler registers an in-memory host API call handler.

func (*Runner) WithArgs added in v0.0.19

func (r *Runner) WithArgs(params ...uint64) *Runner

WithArgs configures parameters to pass to the WASM function.

func (*Runner) WithContext added in v0.0.19

func (r *Runner) WithContext(ctx context.Context) *Runner

WithContext overrides the default context.

func (*Runner) WithDownloadHandler added in v0.0.19

func (r *Runner) WithDownloadHandler(handler func() ([]byte, error)) *Runner

WithDownloadHandler registers an in-memory stream download handler.

func (*Runner) WithEntrypoint added in v0.0.19

func (r *Runner) WithEntrypoint(entrypoint string) *Runner

WithEntrypoint configures the function name to call in the WASM module.

func (*Runner) WithMemoryStore added in v0.0.19

func (r *Runner) WithMemoryStore() *Runner

WithMemoryStore initializes and uses an in-memory snapshot store.

func (*Runner) WithServer added in v0.0.19

func (r *Runner) WithServer(serverAddr string) *Runner

WithServer configures the server address for HTTP upload/download routing.

func (*Runner) WithSessionID added in v0.0.19

func (r *Runner) WithSessionID(instanceID string) *Runner

WithSessionID configures the instance/session ID.

func (*Runner) WithStore added in v0.0.19

func (r *Runner) WithStore(store SnapshotStore) *Runner

WithStore specifies the SnapshotStore to use.

func (*Runner) WithUploadHandler added in v0.0.19

func (r *Runner) WithUploadHandler(handler func(payload []byte) error) *Runner

WithUploadHandler registers an in-memory stream upload handler.

func (*Runner) WithWasmBytes added in v0.0.19

func (r *Runner) WithWasmBytes(wasmBytes []byte) *Runner

WithWasmBytes loads the WASM module directly from memory bytes.

func (*Runner) WithWasmPath added in v0.0.19

func (r *Runner) WithWasmPath(wasmPath string) *Runner

WithWasmPath specifies the file path to the WASM module.

type S3SnapshotStore

type S3SnapshotStore struct {
	Client *s3.Client
	// contains filtered or unexported fields
}

S3SnapshotStore implements SnapshotStore using an S3-compatible object store.

func NewS3SnapshotStore

func NewS3SnapshotStore(ctx context.Context, bucket string, opts ...func(*s3.Options)) (*S3SnapshotStore, error)

NewS3SnapshotStore initializes a new S3 snapshot store.

func (*S3SnapshotStore) Delete

func (s *S3SnapshotStore) Delete(ctx context.Context, id string) error

Delete removes all data associated with the instance from S3.

func (*S3SnapshotStore) Load

func (s *S3SnapshotStore) Load(ctx context.Context, id string) ([]byte, error)

Load reads a full memory snapshot from S3.

func (*S3SnapshotStore) LoadActiveIndex added in v0.0.3

func (s *S3SnapshotStore) LoadActiveIndex(ctx context.Context) ([]byte, error)

LoadActiveIndex loads the compiled active index list from S3.

func (*S3SnapshotStore) LoadDeltas

func (s *S3SnapshotStore) LoadDeltas(ctx context.Context, id string) (map[int][]byte, error)

LoadDeltas retrieves memory deltas from S3.

func (*S3SnapshotStore) LoadMetadata

func (s *S3SnapshotStore) LoadMetadata(ctx context.Context, id string) (*InstanceMeta, error)

LoadMetadata retrieves the instance metadata from S3.

func (*S3SnapshotStore) LoadOplog

func (s *S3SnapshotStore) LoadOplog(ctx context.Context, id string) ([]OplogEntry, error)

LoadOplog retrieves the oplog entries from S3.

func (*S3SnapshotStore) LoadWasm

func (s *S3SnapshotStore) LoadWasm(ctx context.Context, hash string) ([]byte, error)

LoadWasm loads a WASM module binary by its SHA256 hash.

func (*S3SnapshotStore) Save

func (s *S3SnapshotStore) Save(ctx context.Context, id string, snapshot []byte) error

Save writes a full memory snapshot to S3.

func (*S3SnapshotStore) SaveDeltas

func (s *S3SnapshotStore) SaveDeltas(ctx context.Context, id string, deltas map[int][]byte) error

SaveDeltas saves memory deltas to S3 by reading current, overlaying new ones and writing back.

func (*S3SnapshotStore) SaveMetadata

func (s *S3SnapshotStore) SaveMetadata(ctx context.Context, meta *InstanceMeta) (bool, error)

SaveMetadata saves metadata or atomically updates version via CAS using ETag.

func (*S3SnapshotStore) SaveOplog

func (s *S3SnapshotStore) SaveOplog(ctx context.Context, id string, callIndex int, apiName string, request []byte, response []byte) error

SaveOplog appends an API call payload to the database oplog in S3.

func (*S3SnapshotStore) SaveWasm

func (s *S3SnapshotStore) SaveWasm(ctx context.Context, hash string, wasmBytes []byte) error

SaveWasm saves a WASM module binary by its SHA256 hash.

func (*S3SnapshotStore) TruncateDeltas

func (s *S3SnapshotStore) TruncateDeltas(ctx context.Context, id string) error

TruncateDeltas removes deltas from S3.

func (*S3SnapshotStore) TruncateOplog

func (s *S3SnapshotStore) TruncateOplog(ctx context.Context, id string, beforeCallIndex int) error

TruncateOplog removes oplog entries at or below the given call index from S3.

func (*S3SnapshotStore) UpdateActiveIndex added in v0.0.3

func (s *S3SnapshotStore) UpdateActiveIndex(ctx context.Context, id string, info []byte, completed bool) error

UpdateActiveIndex updates the active instance index status.

type Session

type Session struct {

	// In-memory handlers (bypassing loopback HTTP)
	ApiHandler      func(apiName string, request []byte) ([]byte, error)
	DownloadHandler func() ([]byte, error)
	UploadHandler   func(payload []byte) error
	// contains filtered or unexported fields
}

Session tracks the dynamic execution state of a running WASM instance.

func GetSession added in v0.0.6

func GetSession(ctx context.Context) *Session

type SnapshotStore

type SnapshotStore interface {
	Save(ctx context.Context, id string, snapshot []byte) error
	Load(ctx context.Context, id string) ([]byte, error)
	Delete(ctx context.Context, id string) error

	// Delta Snapshots
	SaveDeltas(ctx context.Context, id string, deltas map[int][]byte) error
	LoadDeltas(ctx context.Context, id string) (map[int][]byte, error)
	TruncateDeltas(ctx context.Context, id string) error

	// Oplog
	SaveOplog(ctx context.Context, id string, callIndex int, apiName string, request []byte, response []byte) error
	LoadOplog(ctx context.Context, id string) ([]OplogEntry, error)
	TruncateOplog(ctx context.Context, id string, beforeCallIndex int) error

	// Metadata & OCC
	SaveMetadata(ctx context.Context, meta *InstanceMeta) (bool, error)
	LoadMetadata(ctx context.Context, id string) (*InstanceMeta, error)

	// WASM Registry for Multi-Version Support
	SaveWasm(ctx context.Context, hash string, wasmBytes []byte) error
	LoadWasm(ctx context.Context, hash string) ([]byte, error)

	// Active Index for Console visualization
	UpdateActiveIndex(ctx context.Context, id string, info []byte, completed bool) error
	LoadActiveIndex(ctx context.Context) ([]byte, error)
}

SnapshotStore abstracts the storage backend for linear memory snapshots, deltas, and oplog.

type TestRunner added in v0.0.19

type TestRunner struct {
	// contains filtered or unexported fields
}

TestRunner wraps Runner and exposes test-only execution configuration methods.

func NewTestRunner added in v0.0.19

func NewTestRunner() *TestRunner

NewTestRunner creates a new TestRunner with default context.

func (*TestRunner) Error added in v0.0.19

func (tr *TestRunner) Error() error

Error returns any sticky error accumulated during chaining.

func (*TestRunner) Run added in v0.0.19

func (tr *TestRunner) Run() (crashed bool, err error)

Run compiles the WASM module (if needed), configures the session, runs the execution, and returns whether it crashed and any error.

func (*TestRunner) WithApiHandler added in v0.0.19

func (tr *TestRunner) WithApiHandler(handler func(apiName string, request []byte) ([]byte, error)) *TestRunner

WithApiHandler registers an in-memory host API call handler.

func (*TestRunner) WithArgs added in v0.0.19

func (tr *TestRunner) WithArgs(params ...uint64) *TestRunner

WithArgs configures parameters to pass to the WASM function.

func (*TestRunner) WithContext added in v0.0.19

func (tr *TestRunner) WithContext(ctx context.Context) *TestRunner

WithContext overrides the default context.

func (*TestRunner) WithCrash added in v0.0.19

func (tr *TestRunner) WithCrash(shouldCrash bool) *TestRunner

WithCrash configures whether to simulate a host crash at the first checkpoint.

func (*TestRunner) WithDownloadHandler added in v0.0.19

func (tr *TestRunner) WithDownloadHandler(handler func() ([]byte, error)) *TestRunner

WithDownloadHandler registers an in-memory stream download handler.

func (*TestRunner) WithEntrypoint added in v0.0.19

func (tr *TestRunner) WithEntrypoint(entrypoint string) *TestRunner

WithEntrypoint configures the function name to call in the WASM module.

func (*TestRunner) WithMemoryStore added in v0.0.19

func (tr *TestRunner) WithMemoryStore() *TestRunner

WithMemoryStore initializes and uses an in-memory snapshot store.

func (*TestRunner) WithServer added in v0.0.19

func (tr *TestRunner) WithServer(serverAddr string) *TestRunner

WithServer configures the server address for HTTP upload/download routing.

func (*TestRunner) WithSessionID added in v0.0.19

func (tr *TestRunner) WithSessionID(instanceID string) *TestRunner

WithSessionID configures the instance/session ID.

func (*TestRunner) WithStore added in v0.0.19

func (tr *TestRunner) WithStore(store SnapshotStore) *TestRunner

WithStore specifies the SnapshotStore to use.

func (*TestRunner) WithUploadHandler added in v0.0.19

func (tr *TestRunner) WithUploadHandler(handler func(payload []byte) error) *TestRunner

WithUploadHandler registers an in-memory stream upload handler.

func (*TestRunner) WithWasmBytes added in v0.0.19

func (tr *TestRunner) WithWasmBytes(wasmBytes []byte) *TestRunner

WithWasmBytes loads the WASM module directly from memory bytes.

func (*TestRunner) WithWasmPath added in v0.0.19

func (tr *TestRunner) WithWasmPath(wasmPath string) *TestRunner

WithWasmPath specifies the file path to the WASM module.

Directories

Path Synopsis
examples
camunda/host command
camunda/worker command
pg-store/host command
pg-store/worker command
s3-store/host command
s3-store/worker command
safe-task command
temporal/host command
temporal/worker command
wasm-inspector command
runner module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL