Documentation
¶
Overview ¶
Package sources provides lazy data sources that produce iter.Seq2[T, error] sequences.
Sources return iter.Seq2[T, error] so read, decode, scan, and cancellation failures can flow through the pipeline. When composing these sources with vortex/iterx, use the helpers that accept iter.Seq2 such as iterx.Filter, iterx.Map, iterx.Take, iterx.Validate, iterx.Drain, and iterx.ForEach.
For plain iter.Seq[T] values that cannot yield errors, vortex keeps parallel helper variants like iterx.FilterSeq, iterx.MapSeq, and iterx.TakeSeq.
Sources stream data one item at a time — they never load everything into memory. Supports CSV files, JSONL files, databases, text files, and standard input.
All sources accept any io.Reader — files, HTTP responses, and network streams all work without any changes to your code.
Benchmarks ¶
Measured on Windows with 1,000,000 line JSONL file:
vortex eager early stop 24ms 909ms (37x faster) peak memory 1 MB 194 MB (194x less) full scan 703ms 808ms (similar) peak memory 2 MB 168 MB (84x less)
Example — JSONL file ¶
file, err := os.Open("logs.jsonl")
if err != nil {
log.Fatal(err)
}
defer file.Close()
// find first 100 errors — reads only ~991 lines from a 1M line file
logs := sources.JSONLines[LogEntry](ctx, file)
errors := iterx.Filter(ctx, logs, func(e LogEntry) bool { return e.Level == "error" })
first100 := iterx.Take(ctx, errors, 100)
for entry, err := range first100 {
if err != nil { log.Fatal(err) }
fmt.Println(entry.Message)
}
Example — database ¶
for row, err := range sources.DBRows(ctx, db, "SELECT * FROM users", scan) {
if err != nil { log.Fatal(err) }
process(row)
}
Example — CSV from HTTP ¶
resp, _ := http.DefaultClient.Do(req)
defer resp.Body.Close()
for row, err := range sources.CSVRows(ctx, resp.Body) {
if err != nil { log.Fatal(err) }
fmt.Println(row)
}
Index ¶
- func CSVRows(ctx context.Context, r io.Reader) iter.Seq2[[]string, error]
- func DBRows[T any](ctx context.Context, db querier, query string, scan func(*sql.Rows) (T, error), ...) iter.Seq2[T, error]
- func FileLines(ctx context.Context, path string) iter.Seq2[string, error]
- func JSONLines[T any](ctx context.Context, r io.Reader) iter.Seq2[T, error]
- func JSONLinesFile[T any](ctx context.Context, path string) iter.Seq2[T, error]
- func Lines(ctx context.Context, r io.Reader) iter.Seq2[string, error]
- func Stdin(ctx context.Context) iter.Seq2[string, error]
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func CSVRows ¶
CSVRows returns a lazy sequence of rows from a CSV reader.
Example ¶
package main
import (
"context"
"fmt"
"strings"
"github.com/MostafaMagdSalama/vortex/sources"
)
func main() {
input := strings.NewReader("name,age\nAlice,30\nBob,25\n")
for row, err := range sources.CSVRows(context.Background(), input) {
if err != nil {
fmt.Println("error:", err)
return
}
fmt.Println(row[0], row[1])
}
}
Output: name age Alice 30 Bob 25
Example (Filter) ¶
input := "name,email,status\nAlice,alice@example.com,active\nBob,bob@example.com,inactive\nCharlie,charlie@example.com,active\n"
r := strings.NewReader(input)
// skip header and filter active users
first := true
for row, err := range CSVRows(context.Background(), r) {
if err != nil {
fmt.Println("Error:", err)
return
}
if first {
first = false
continue
}
if row[2] == "active" {
fmt.Println(row[0])
}
}
Output: Alice Charlie
Example (Pipeline) ¶
input := "name,email,status\nAlice,alice@example.com,active\nBob,bob@example.com,inactive\nCharlie,charlie@example.com,active\n"
r := strings.NewReader(input)
ctx := context.Background()
rows := func(yield func([]string) bool) {
for row, err := range CSVRows(ctx, r) {
if err != nil {
fmt.Println("Error:", err)
return
}
if !yield(row) {
return
}
}
}
first := true
dataRows := iterx.FilterSeq(ctx, rows, func(row []string) bool {
if first {
first = false
return false
}
return true
})
names := iterx.MapSeq(ctx,
iterx.FilterSeq(ctx, dataRows, func(row []string) bool {
return row[2] == "active"
}),
func(row []string) string {
return row[0]
},
)
for name := range names {
fmt.Println(name)
}
Output: Alice Charlie
func DBRows ¶
func DBRows[T any](ctx context.Context, db querier, query string, scan func(*sql.Rows) (T, error), args ...any) iter.Seq2[T, error]
DBRows returns a lazy sequence of scanned rows from a SQL query. Accepts optional query arguments via variadic args. Always check the error — a non-nil error means iteration stopped early due to a scan failure, driver error, or context cancellation.
example without args:
for u, err := range sources.DBRows(ctx, db, "SELECT * FROM users", scan) {
if err != nil {
log.Println(err)
return
}
process(u)
}
example with args:
for u, err := range sources.DBRows(ctx, db,
"SELECT * FROM users WHERE status = ?",
scan,
"active",
) {
if err != nil {
log.Println(err)
return
}
process(u)
}
func FileLines ¶
FileLines opens a file and returns a lazy sequence of its lines.
Example ¶
package main
import (
"context"
"fmt"
"path/filepath"
"github.com/MostafaMagdSalama/vortex/sources"
)
func main() {
for line, err := range sources.FileLines(context.Background(), filepath.Join("testdata", "sample.txt")) {
if err != nil {
fmt.Println("error:", err)
return
}
fmt.Println(line)
}
}
Output: hello\nworld\nfoo
func JSONLines ¶ added in v0.1.9
JSONLines returns a lazy sequence of decoded JSON objects from any io.Reader. Each line must be a valid JSON object — empty lines are skipped. Stops immediately if ctx is cancelled or the consumer breaks early.
Works with any io.Reader — files, HTTP responses, buffers:
// from file
file, _ := os.Open("data.jsonl")
defer file.Close()
for item, err := range sources.JSONLines[User](ctx, file) {
if err != nil { log.Fatal(err) }
fmt.Println(item)
}
// from HTTP response
resp, _ := http.DefaultClient.Do(req)
defer resp.Body.Close()
for item, err := range sources.JSONLines[User](ctx, resp.Body) {
if err != nil { log.Fatal(err) }
fmt.Println(item)
}
Example ¶
Example functions for pkg.go.dev
package main
import (
"context"
"fmt"
"strings"
"github.com/MostafaMagdSalama/vortex/sources"
)
func main() {
input := strings.NewReader(`{"level":"info","message":"started","service":"api"}
{"level":"error","message":"failed","service":"api"}
`)
type Entry struct {
Level string `json:"level"`
Message string `json:"message"`
Service string `json:"service"`
}
for item, err := range sources.JSONLines[Entry](context.Background(), input) {
if err != nil {
break
}
fmt.Printf("%s: %s\n", item.Level, item.Message)
}
}
Output: info: started error: failed
Example (EarlyStop) ¶
package main
import (
"context"
"fmt"
"strings"
"github.com/MostafaMagdSalama/vortex/sources"
)
func main() {
input := strings.NewReader(`{"level":"info","message":"started","service":"api"}
{"level":"error","message":"failed","service":"api"}
{"level":"info","message":"done","service":"api"}
{"level":"error","message":"timeout","service":"api"}
`)
ctx := context.Background()
// create lazy source
logs := sources.JSONLines[struct {
Level string `json:"level"`
Message string `json:"message"`
Service string `json:"service"`
}](ctx, input)
// unwrap errors
entries := func(yield func(struct {
Level string `json:"level"`
Message string `json:"message"`
Service string `json:"service"`
}) bool) {
for entry, err := range logs {
if err != nil {
return
}
if !yield(entry) {
return
}
}
}
// count errors found
count := 0
for entry := range entries {
if entry.Level == "error" {
fmt.Println(entry.Message)
count++
}
}
fmt.Printf("errors found: %d\n", count)
}
Output: failed timeout errors found: 2
func JSONLinesFile ¶ added in v0.1.9
JSONLinesFile opens a file and returns a lazy sequence of decoded JSON objects. Each line must be a valid JSON object — empty lines are skipped.
example:
for item, err := range sources.JSONLinesFile[User](ctx, "users.jsonl") {
if err != nil { log.Fatal(err) }
fmt.Println(item)
}
Example ¶
package main
import (
"context"
"fmt"
"os"
"github.com/MostafaMagdSalama/vortex/sources"
)
func main() {
// write a temp file for the example
tmp, _ := os.CreateTemp("", "*.jsonl")
defer os.Remove(tmp.Name())
tmp.WriteString(`{"id":1,"name":"Alice","price":9.99}` + "\n")
tmp.WriteString(`{"id":2,"name":"Bob","price":19.99}` + "\n")
tmp.Close()
type Product struct {
ID int `json:"id"`
Name string `json:"name"`
Price float64 `json:"price"`
}
for item, err := range sources.JSONLinesFile[Product](context.Background(), tmp.Name()) {
if err != nil {
break
}
fmt.Printf("%d: %s $%.2f\n", item.ID, item.Name, item.Price)
}
}
Output: 1: Alice $9.99 2: Bob $19.99
func Lines ¶
Lines returns a lazy sequence of lines from any io.Reader.
Example ¶
package main
import (
"context"
"fmt"
"strings"
"github.com/MostafaMagdSalama/vortex/sources"
)
func main() {
input := strings.NewReader("line1\nline2\nline3\n")
for line, err := range sources.Lines(context.Background(), input) {
if err != nil {
fmt.Println("error:", err)
return
}
fmt.Println(line)
}
}
Output: line1 line2 line3
func Stdin ¶
Stdin returns a lazy sequence of lines from standard input.
Example ¶
package main
import (
"context"
"fmt"
"os"
"github.com/MostafaMagdSalama/vortex/sources"
)
func main() {
tmpFile, err := os.CreateTemp("", "vortex-stdin-example-*")
if err != nil {
return
}
defer os.Remove(tmpFile.Name())
defer tmpFile.Close()
if _, err := tmpFile.WriteString("one\ntwo\n"); err != nil {
return
}
if _, err := tmpFile.Seek(0, 0); err != nil {
return
}
oldStdin := os.Stdin
os.Stdin = tmpFile
defer func() { os.Stdin = oldStdin }()
for line, err := range sources.Stdin(context.Background()) {
if err != nil {
fmt.Println("error:", err)
return
}
fmt.Println(line)
}
}
Output: one two
Types ¶
This section is empty.