streamv2

module
v0.0.0-...-4ae7bce Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 23, 2025 License: MIT

README ΒΆ

StreamV2 - Modern Stream Processing for Go

Go Version License

StreamV2 is a powerful, type-safe stream processing library for Go. Process data with functional programming patterns while keeping your code simple and readable.

πŸš€ Key Features

  • πŸ”₯ Type-Safe - Full generics support with compile-time safety
  • ⚑ High Performance - Conservative auto-parallel processing (GPU acceleration planned)
  • πŸ“Š Rich I/O - CSV, JSON, TSV, Protocol Buffers with streaming support
  • 🎯 Simple API - Clean, composable functions that just work

πŸ“¦ Installation

go get github.com/rosscartlidge/streamv2

🎯 Quick Start

package main

import (
    "fmt"
    "github.com/rosscartlidge/streamv2/pkg/stream"
)

func main() {
    // Create a stream from any slice
    numbers := stream.FromSlice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10})
    
    // Chain operations with full type safety
    result, _ := stream.Sum(
        stream.Map(func(x int) int { return x * x })(
            stream.Where(func(x int) bool { return x%2 == 0 })(numbers)))
    
    fmt.Printf("Sum of even squares: %d\n", result) // Output: 220
}

πŸ”₯ Why StreamV2?

Traditional Go
// Group employees by department and calculate averages - lots of code!
deptSalaries := make(map[string][]float64)
for _, emp := range employees {
    dept := emp["department"].(string)
    salary := emp["salary"].(float64)
    deptSalaries[dept] = append(deptSalaries[dept], salary)
}

deptAverages := make(map[string]float64)
for dept, salaries := range deptSalaries {
    var sum float64
    for _, salary := range salaries {
        sum += salary
    }
    deptAverages[dept] = sum / float64(len(salaries))
}
StreamV2
// Same result in one line!
results, _ := stream.Collect(
    stream.GroupBy([]string{"department"},
        stream.AvgField[float64]("avg_salary", "salary"),
    )(employeeStream))

πŸ“Š Working with Structured Data

// Create records with type-safe builders
users := []stream.Record{
    stream.NewRecord().String("name", "Alice").Int("age", 30).Float("salary", 75000).Build(),
    stream.NewRecord().String("name", "Bob").Int("age", 25).Float("salary", 65000).Build(),
    stream.NewRecord().String("name", "Carol").Int("age", 35).Float("salary", 85000).Build(),
}

// Group and aggregate with explicit operations
results, _ := stream.Collect(
    stream.GroupBy([]string{"department"}, 
        stream.CountField("count", "name"),
        stream.AvgField[float64]("avg_salary", "salary"),
    )(stream.FromSlice(users)))

// Access results safely
for _, result := range results {
    count := stream.GetOr(result, "count", int64(0))
    avgSalary := stream.GetOr(result, "avg_salary", 0.0)
    fmt.Printf("Average salary: $%.0f (%d people)\n", avgSalary, count)
}

πŸ”§ Data Processing Made Simple

CSV Processing
// Read CSV file
data, _ := stream.CSVToStreamFromFile("sales.csv")

// Process and write results  
processed := stream.Map(func(r stream.Record) stream.Record {
    total := stream.GetOr(r, "price", 0.0) * stream.GetOr(r, "quantity", 0.0)
    return r.Set("total", total)
})(data)

stream.StreamToCSVFile(processed, "results.csv")
Multiple Aggregations
// Get multiple statistics at once
stats, _ := stream.Aggregates(numbers,
    stream.SumStream[int]("total"),
    stream.CountStream[int]("count"), 
    stream.AvgStream[int]("average"),
    stream.MinStream[int]("minimum"),
    stream.MaxStream[int]("maximum"),
)

fmt.Printf("Stats: %+v\n", stats)
Parallel Processing
// Conservative automatic parallelization for complex operations
results := stream.Map(expensiveFunction)(largeDataset) // May auto-parallel
simple := stream.Map(func(x int) int { return x * 2 })(smallDataset) // Sequential

// Explicit parallel processing for full control
processed := stream.Parallel(4, complexFunction)(datastream) // 4 workers

πŸ’‘ Real-World Examples

Log Analysis
// Process server logs
logs, _ := stream.CSVToStreamFromFile("access.log")

errorCounts, _ := stream.Collect(
    stream.GroupBy([]string{"status_code"}, 
        stream.CountField("errors", "request_id"),
    )(stream.Where(func(r stream.Record) bool {
        return stream.GetOr(r, "status_code", 0) >= 400
    })(logs)))
Financial Data
// Calculate trading statistics
trades, _ := stream.CSVToStreamFromFile("trades.csv")

summary, _ := stream.Aggregates(
    stream.ExtractField[float64]("price")(trades),
    stream.SumStream[float64]("total_volume"),
    stream.AvgStream[float64]("avg_price"),
    stream.MaxStream[float64]("high"),
    stream.MinStream[float64]("low"),
)

πŸ—οΈ Core Concepts

StreamV2 has just three main types:

  • Stream[T] - A sequence of values of type T
  • Filter[T, U] - Transforms Stream[T] to Stream[U]
  • Record - Structured data (like a database row)

Records can store any Go basic type (int, string, bool, etc.), nested Records, or even Streams. This gives you complete flexibility while maintaining type safety.

Everything else builds on these simple foundations.

πŸ“š Learn More

🎯 Perfect For

  • Data Analytics - ETL pipelines and transformations
  • Log Processing - Parse and analyze structured logs
  • CSV/JSON Processing - Type-safe data manipulation
  • Real-time Analytics - Process streaming data efficiently

🀝 Contributing

StreamV2 is currently stabilizing its API. See CONTRIBUTING.md for how to provide feedback and help with testing.

πŸ“„ License

MIT License - see LICENSE file for details.


StreamV2 makes stream processing in Go simple, type-safe, and fast. Start building better data pipelines today!

Directories ΒΆ

Path Synopsis
examples
basic command
comparison command
csv_tsv_io command
infinite_split command
join_examples command
json_io command
protobuf_io command
simple_groupby command
sorting command
split_streams command
pkg

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL