streamforge

module
v0.0.0-...-2c2f5ba Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 10, 2026 License: Apache-2.0

README

StreamForge

CI Go Reference Go Report Card

High-performance stateful stream processing framework for Go. A Kafka Streams alternative with exactly-once semantics, windowing, and distributed checkpointing.

Architecture

Sources (Kafka/Channel/Generator)
    |
    v
Processor Chain (Map -> Filter -> FlatMap -> Aggregate -> Join)
    |
    v
State Store (In-Memory / BoltDB per partition)
    |
    v
Sinks (Kafka / Stdout / File)

Checkpoint Coordinator (Chandy-Lamport barriers)
    -> File / S3 checkpoint storage

Backpressure Controller (credit-based flow control)

Features

  • Fluent Topology Builder - Compose processing pipelines with method chaining
  • Stateful Processing - In-memory and BoltDB-backed state stores with snapshot/restore
  • Windowing - Tumbling, sliding, and session windows with custom triggers
  • Aggregation - Count, sum, reduce, and custom aggregate functions
  • Exactly-Once Semantics - Chandy-Lamport distributed checkpointing
  • Backpressure - Credit-based flow control with adaptive rate limiting
  • Multiple Sources/Sinks - Kafka, in-memory channels, generators, stdout, files
  • Observability - OpenTelemetry metrics and tracing, structured logging
  • Python SDK - Build topologies in Python, deploy to Go runtime

Quick Start

Installation
go get github.com/ilya-shevelev/streamforge
Word Count Example
package main

import (
    "context"
    "fmt"
    "strings"
    "time"

    "github.com/ilya-shevelev/streamforge/internal/runtime"
    "github.com/ilya-shevelev/streamforge/pkg/processor"
    "github.com/ilya-shevelev/streamforge/pkg/record"
    "github.com/ilya-shevelev/streamforge/pkg/sink"
    "github.com/ilya-shevelev/streamforge/pkg/source"
    "github.com/ilya-shevelev/streamforge/pkg/topology"
)

func main() {
    ch := make(chan *record.Record, 10)
    go func() {
        for _, s := range []string{"hello world", "hello streamforge"} {
            ch <- record.New([]byte("key"), []byte(s))
        }
        close(ch)
    }()

    topo, _ := topology.NewTopology("wordcount").
        Source("input", source.NewChannelSource(ch)).
        FlatMap("split", func(_ context.Context, r *record.Record) ([]*record.Record, error) {
            var out []*record.Record
            for _, w := range strings.Fields(string(r.Value)) {
                out = append(out, record.New([]byte(w), []byte(w)))
            }
            return out, nil
        }).
        GroupByKey().
        Count("counter").
        Sink("output", sink.NewStdoutSink()).
        Build()

    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()

    engine := runtime.NewEngine(topo)
    engine.Run(ctx)
}
Fluent API
// Build complex topologies with method chaining.
topo, err := topology.NewTopology("pipeline").
    Source("events", kafkaSource).
    Map("parse", parseEvent).
    Filter("valid", isValid).
    GroupByKey().
    Window(window.NewTumblingWindow(time.Minute)).
    Count("event-count").
    Sink("output", kafkaSink).
    Build()
Windowed Aggregation
// Tumbling window: fixed, non-overlapping time intervals.
window.NewTumblingWindow(1 * time.Minute)

// Sliding window: overlapping windows with configurable slide.
window.NewSlidingWindow(10*time.Minute, 5*time.Minute)

// Session window: groups events with an inactivity gap.
window.NewSessionWindow(30 * time.Second)
State Management
// In-memory store (for development/testing).
store := state.NewMemoryStore()

// BoltDB-backed persistent store (for production).
store, err := state.NewBoltStore("/data/state")

// Snapshot and restore for checkpointing.
snapshot, _ := store.Snapshot()
store.Restore(snapshot)
Python SDK
from streamforge import Topology, Source, Sink, Window

topo = (
    Topology("clickstream")
    .source("events", Source.kafka(topic="clicks", brokers=["kafka:9092"]))
    .map("parse", config={"format": "json"})
    .group_by_key()
    .window(Window.tumbling(minutes=5))
    .count("click-count")
    .sink("output", Sink.kafka(topic="counts", brokers=["kafka:9092"]))
)

print(topo.to_yaml())

Project Structure

streamforge/
├── cmd/
│   ├── streamforge/       # Main processing node
│   └── sfctl/             # CLI management tool
├── api/proto/v1/          # Protocol buffer definitions
├── pkg/
│   ├── topology/          # Fluent topology builder
│   ├── processor/         # Map, Filter, FlatMap, Aggregate, Join
│   ├── window/            # Tumbling, Sliding, Session windows
│   ├── state/             # StateStore interface + implementations
│   ├── source/            # Source interface + Kafka, Channel, Generator
│   ├── sink/              # Sink interface + Kafka, Stdout, File
│   ├── checkpoint/        # Chandy-Lamport checkpointing
│   ├── backpressure/      # Credit-based flow control
│   ├── serde/             # JSON, String, Bytes serialization
│   ├── record/            # Core Record type
│   └── observability/     # Metrics, tracing, logging
├── internal/
│   ├── runtime/           # Processing engine
│   ├── network/           # Inter-node shuffle
│   └── config/            # Configuration
├── sdk/python/            # Python topology builder
├── deploy/
│   ├── docker/            # Dockerfile (distroless)
│   └── helm/              # Helm chart
└── examples/
    ├── wordcount/         # Classic word count
    ├── clickstream/       # Page view aggregation
    └── fraud-detection/   # Transaction fraud detection

Building

# Build all binaries
make build

# Run tests
make test

# Run with coverage
make test-cover

# Lint
make lint

# Docker image
make docker

Deployment

Docker
docker build -t streamforge -f deploy/docker/Dockerfile .
docker run -e STREAMFORGE_NODE_ID=node-1 streamforge
Kubernetes (Helm)
helm install streamforge deploy/helm/streamforge \
  --set config.kafka.brokers[0]=kafka:9092 \
  --set replicaCount=3

Configuration

StreamForge can be configured via JSON file or environment variables:

Environment Variable Description Default
STREAMFORGE_NODE_ID Unique node identifier node-1
STREAMFORGE_LISTEN_ADDR HTTP listen address :8080
STREAMFORGE_KAFKA_BROKERS Kafka broker addresses localhost:9092
STREAMFORGE_STATE_BACKEND State store backend (memory or bolt) memory
STREAMFORGE_STATE_DIR State store directory /tmp/streamforge/state
STREAMFORGE_CHECKPOINT_DIR Checkpoint storage directory /tmp/streamforge/checkpoints

License

MIT License - see LICENSE for details.

Directories

Path Synopsis
cmd
sfctl command
Command sfctl is the CLI management tool for StreamForge.
Command sfctl is the CLI management tool for StreamForge.
streamforge command
Command streamforge runs a StreamForge processing node.
Command streamforge runs a StreamForge processing node.
examples
clickstream command
Command clickstream demonstrates aggregating page view events by URL with tumbling windows using StreamForge.
Command clickstream demonstrates aggregating page view events by URL with tumbling windows using StreamForge.
fraud-detection command
Command fraud-detection demonstrates a simple fraud detection pipeline that flags transactions exceeding a threshold or rapid successive transactions.
Command fraud-detection demonstrates a simple fraud detection pipeline that flags transactions exceeding a threshold or rapid successive transactions.
wordcount command
Command wordcount demonstrates the classic word count stream processing example using the StreamForge fluent API.
Command wordcount demonstrates the classic word count stream processing example using the StreamForge fluent API.
internal
config
Package config handles configuration loading for StreamForge from files, environment variables, and defaults.
Package config handles configuration loading for StreamForge from files, environment variables, and defaults.
network
Package network provides inter-node communication for distributed processing, including record shuffling for repartitioning operations.
Package network provides inter-node communication for distributed processing, including record shuffling for repartitioning operations.
runtime
Package runtime provides the core processing engine that executes a topology.
Package runtime provides the core processing engine that executes a topology.
pkg
backpressure
Package backpressure implements credit-based flow control and adaptive rate limiting for the StreamForge processing pipeline.
Package backpressure implements credit-based flow control and adaptive rate limiting for the StreamForge processing pipeline.
checkpoint
Package checkpoint implements the Chandy-Lamport distributed snapshot algorithm for consistent checkpointing of stream processing state.
Package checkpoint implements the Chandy-Lamport distributed snapshot algorithm for consistent checkpointing of stream processing state.
observability
Package observability provides metrics, tracing, and structured logging integration for StreamForge using OpenTelemetry and Prometheus.
Package observability provides metrics, tracing, and structured logging integration for StreamForge using OpenTelemetry and Prometheus.
processor
Package processor defines the Processor interface and core processor implementations (Map, Filter, FlatMap, Aggregate, Join, GroupBy) used in the StreamForge pipeline.
Package processor defines the Processor interface and core processor implementations (Map, Filter, FlatMap, Aggregate, Join, GroupBy) used in the StreamForge pipeline.
record
Package record defines the core Record type used throughout the StreamForge framework.
Package record defines the core Record type used throughout the StreamForge framework.
serde
Package serde provides serialization/deserialization interfaces and implementations for StreamForge records.
Package serde provides serialization/deserialization interfaces and implementations for StreamForge records.
sink
Package sink defines the Sink interface and implementations for writing processed records out of the StreamForge pipeline.
Package sink defines the Sink interface and implementations for writing processed records out of the StreamForge pipeline.
source
Package source defines the Source interface and implementations for reading records into the StreamForge processing pipeline.
Package source defines the Source interface and implementations for reading records into the StreamForge processing pipeline.
state
Package state provides the StateStore interface and implementations for managing processor state.
Package state provides the StateStore interface and implementations for managing processor state.
topology
Package topology provides the fluent builder API for constructing stream processing topologies in StreamForge.
Package topology provides the fluent builder API for constructing stream processing topologies in StreamForge.
window
Package window implements windowing strategies for stream processing: tumbling windows, sliding windows, and session windows.
Package window implements windowing strategies for stream processing: tumbling windows, sliding windows, and session windows.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL