sources

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 17, 2026 License: MIT Imports: 9 Imported by: 0

Documentation

Overview

Package sources provides lazy data sources that produce iter.Seq2[T, error] sequences.

Sources return iter.Seq2[T, error] so read, decode, scan, and cancellation failures can flow through the pipeline. When composing these sources with vortex/iterx, use the helpers that accept iter.Seq2 such as iterx.Filter, iterx.Map, iterx.Take, iterx.Validate, iterx.Drain, and iterx.ForEach.

For plain iter.Seq[T] values that cannot yield errors, vortex keeps parallel helper variants like iterx.FilterSeq, iterx.MapSeq, and iterx.TakeSeq.

Sources stream data one item at a time — they never load everything into memory. Supports CSV files, JSONL files, databases, text files, and standard input.

All sources accept any io.Reader — files, HTTP responses, and network streams all work without any changes to your code.

Benchmarks

Measured on Windows with 1,000,000 line JSONL file:

                 vortex      eager
early stop       24ms        909ms     (37x faster)
peak memory      1 MB        194 MB    (194x less)

full scan        703ms       808ms     (similar)
peak memory      2 MB        168 MB    (84x less)

Example — JSONL file

file, err := os.Open("logs.jsonl")
if err != nil {
    log.Fatal(err)
}
defer file.Close()

// find first 100 errors — reads only ~991 lines from a 1M line file
logs   := sources.JSONLines[LogEntry](ctx, file)
errors := iterx.Filter(ctx, logs, func(e LogEntry) bool { return e.Level == "error" })
first100 := iterx.Take(ctx, errors, 100)

for entry, err := range first100 {
    if err != nil { log.Fatal(err) }
    fmt.Println(entry.Message)
}

Example — database

for row, err := range sources.DBRows(ctx, db, "SELECT * FROM users", scan) {
    if err != nil { log.Fatal(err) }
    process(row)
}

Example — CSV from HTTP

resp, _ := http.DefaultClient.Do(req)
defer resp.Body.Close()

for row, err := range sources.CSVRows(ctx, resp.Body) {
    if err != nil { log.Fatal(err) }
    fmt.Println(row)
}

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func CSVRows

func CSVRows(ctx context.Context, r io.Reader) iter.Seq2[[]string, error]

CSVRows returns a lazy sequence of rows from a CSV reader.

Example
package main

import (
	"context"
	"fmt"
	"strings"

	"github.com/MostafaMagdSalama/vortex/sources"
)

func main() {
	input := strings.NewReader("name,age\nAlice,30\nBob,25\n")

	for row, err := range sources.CSVRows(context.Background(), input) {
		if err != nil {
			fmt.Println("error:", err)
			return
		}
		fmt.Println(row[0], row[1])
	}
}
Output:
name age
Alice 30
Bob 25
Example (Filter)
input := "name,email,status\nAlice,alice@example.com,active\nBob,bob@example.com,inactive\nCharlie,charlie@example.com,active\n"
r := strings.NewReader(input)

// skip header and filter active users
first := true
for row, err := range CSVRows(context.Background(), r) {
	if err != nil {
		fmt.Println("Error:", err)
		return
	}
	if first {
		first = false
		continue
	}
	if row[2] == "active" {
		fmt.Println(row[0])
	}
}
Output:
Alice
Charlie
Example (Pipeline)
input := "name,email,status\nAlice,alice@example.com,active\nBob,bob@example.com,inactive\nCharlie,charlie@example.com,active\n"
r := strings.NewReader(input)

ctx := context.Background()

rows := func(yield func([]string) bool) {
	for row, err := range CSVRows(ctx, r) {
		if err != nil {
			fmt.Println("Error:", err)
			return
		}
		if !yield(row) {
			return
		}
	}
}

first := true
dataRows := iterx.FilterSeq(ctx, rows, func(row []string) bool {
	if first {
		first = false
		return false
	}
	return true
})

names := iterx.MapSeq(ctx,
	iterx.FilterSeq(ctx, dataRows, func(row []string) bool {
		return row[2] == "active"
	}),
	func(row []string) string {
		return row[0]
	},
)

for name := range names {
	fmt.Println(name)
}
Output:
Alice
Charlie

func DBRows

func DBRows[T any](ctx context.Context, db querier, query string, scan func(*sql.Rows) (T, error), args ...any) iter.Seq2[T, error]

DBRows returns a lazy sequence of scanned rows from a SQL query. Accepts optional query arguments via variadic args. Always check the error — a non-nil error means iteration stopped early due to a scan failure, driver error, or context cancellation.

example without args:

for u, err := range sources.DBRows(ctx, db, "SELECT * FROM users", scan) {
    if err != nil {
        log.Println(err)
        return
    }
    process(u)
}

example with args:

for u, err := range sources.DBRows(ctx, db,
    "SELECT * FROM users WHERE status = ?",
    scan,
    "active",
) {
    if err != nil {
        log.Println(err)
        return
    }
    process(u)
}

func FileLines

func FileLines(ctx context.Context, path string) iter.Seq2[string, error]

FileLines opens a file and returns a lazy sequence of its lines.

Example
package main

import (
	"context"
	"fmt"
	"path/filepath"

	"github.com/MostafaMagdSalama/vortex/sources"
)

func main() {
	for line, err := range sources.FileLines(context.Background(), filepath.Join("testdata", "sample.txt")) {
		if err != nil {
			fmt.Println("error:", err)
			return
		}
		fmt.Println(line)
	}
}
Output:
hello\nworld\nfoo

func JSONLines added in v0.1.9

func JSONLines[T any](ctx context.Context, r io.Reader) iter.Seq2[T, error]

JSONLines returns a lazy sequence of decoded JSON objects from any io.Reader. Each line must be a valid JSON object — empty lines are skipped. Stops immediately if ctx is cancelled or the consumer breaks early.

Works with any io.Reader — files, HTTP responses, buffers:

// from file
file, _ := os.Open("data.jsonl")
defer file.Close()
for item, err := range sources.JSONLines[User](ctx, file) {
    if err != nil { log.Fatal(err) }
    fmt.Println(item)
}

// from HTTP response
resp, _ := http.DefaultClient.Do(req)
defer resp.Body.Close()
for item, err := range sources.JSONLines[User](ctx, resp.Body) {
    if err != nil { log.Fatal(err) }
    fmt.Println(item)
}
Example

Example functions for pkg.go.dev

package main

import (
	"context"
	"fmt"
	"strings"

	"github.com/MostafaMagdSalama/vortex/sources"
)

func main() {
	input := strings.NewReader(`{"level":"info","message":"started","service":"api"}
{"level":"error","message":"failed","service":"api"}
`)

	type Entry struct {
		Level   string `json:"level"`
		Message string `json:"message"`
		Service string `json:"service"`
	}

	for item, err := range sources.JSONLines[Entry](context.Background(), input) {
		if err != nil {
			break
		}
		fmt.Printf("%s: %s\n", item.Level, item.Message)
	}
}
Output:
info: started
error: failed
Example (EarlyStop)
package main

import (
	"context"
	"fmt"
	"strings"

	"github.com/MostafaMagdSalama/vortex/sources"
)

func main() {
	input := strings.NewReader(`{"level":"info","message":"started","service":"api"}
{"level":"error","message":"failed","service":"api"}
{"level":"info","message":"done","service":"api"}
{"level":"error","message":"timeout","service":"api"}
`)

	ctx := context.Background()

	// create lazy source
	logs := sources.JSONLines[struct {
		Level   string `json:"level"`
		Message string `json:"message"`
		Service string `json:"service"`
	}](ctx, input)

	// unwrap errors
	entries := func(yield func(struct {
		Level   string `json:"level"`
		Message string `json:"message"`
		Service string `json:"service"`
	}) bool) {
		for entry, err := range logs {
			if err != nil {
				return
			}
			if !yield(entry) {
				return
			}
		}
	}

	// count errors found
	count := 0
	for entry := range entries {
		if entry.Level == "error" {
			fmt.Println(entry.Message)
			count++
		}
	}
	fmt.Printf("errors found: %d\n", count)
}
Output:
failed
timeout
errors found: 2

func JSONLinesFile added in v0.1.9

func JSONLinesFile[T any](ctx context.Context, path string) iter.Seq2[T, error]

JSONLinesFile opens a file and returns a lazy sequence of decoded JSON objects. Each line must be a valid JSON object — empty lines are skipped.

example:

for item, err := range sources.JSONLinesFile[User](ctx, "users.jsonl") {
    if err != nil { log.Fatal(err) }
    fmt.Println(item)
}
Example
package main

import (
	"context"
	"fmt"
	"os"

	"github.com/MostafaMagdSalama/vortex/sources"
)

func main() {
	// write a temp file for the example
	tmp, _ := os.CreateTemp("", "*.jsonl")
	defer os.Remove(tmp.Name())
	tmp.WriteString(`{"id":1,"name":"Alice","price":9.99}` + "\n")
	tmp.WriteString(`{"id":2,"name":"Bob","price":19.99}` + "\n")
	tmp.Close()

	type Product struct {
		ID    int     `json:"id"`
		Name  string  `json:"name"`
		Price float64 `json:"price"`
	}

	for item, err := range sources.JSONLinesFile[Product](context.Background(), tmp.Name()) {
		if err != nil {
			break
		}
		fmt.Printf("%d: %s $%.2f\n", item.ID, item.Name, item.Price)
	}
}
Output:
1: Alice $9.99
2: Bob $19.99

func Lines

func Lines(ctx context.Context, r io.Reader) iter.Seq2[string, error]

Lines returns a lazy sequence of lines from any io.Reader.

Example
package main

import (
	"context"
	"fmt"
	"strings"

	"github.com/MostafaMagdSalama/vortex/sources"
)

func main() {
	input := strings.NewReader("line1\nline2\nline3\n")

	for line, err := range sources.Lines(context.Background(), input) {
		if err != nil {
			fmt.Println("error:", err)
			return
		}
		fmt.Println(line)
	}
}
Output:
line1
line2
line3

func Stdin

func Stdin(ctx context.Context) iter.Seq2[string, error]

Stdin returns a lazy sequence of lines from standard input.

Example
package main

import (
	"context"
	"fmt"
	"os"

	"github.com/MostafaMagdSalama/vortex/sources"
)

func main() {
	tmpFile, err := os.CreateTemp("", "vortex-stdin-example-*")
	if err != nil {
		return
	}
	defer os.Remove(tmpFile.Name())
	defer tmpFile.Close()

	if _, err := tmpFile.WriteString("one\ntwo\n"); err != nil {
		return
	}
	if _, err := tmpFile.Seek(0, 0); err != nil {
		return
	}

	oldStdin := os.Stdin
	os.Stdin = tmpFile
	defer func() { os.Stdin = oldStdin }()

	for line, err := range sources.Stdin(context.Background()) {
		if err != nil {
			fmt.Println("error:", err)
			return
		}
		fmt.Println(line)
	}
}
Output:
one
two

Types

This section is empty.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL