bigcsv

package module
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 6, 2024 License: Unlicense Imports: 11 Imported by: 0

README

= bigcsv

bigcsv exists to conveniently parse and process large CSV files in a streaming
fashion.

It is a fairly thin wrapper around `encoding/csv` which allows reusing a parsing
function as well as the option to process rows in parallel.

== Usage

Below is a fairly full usage example showing processing over HTTP, with multiple
workers, and a canceling context.

Similar to the `csv.Reader`, you should first create the parser, then change any
settings prior to reading.

The basic steps are:

1. define a type to hold the data you want from a single row.
2. provide a function which parses the row (`[]string`) and returns your data
   type (plus an error): `func (row[]string) (T, error)`
3. provide a function which defines what to do with your parsed data: `func
   (data T) error`
4. optionally, define an error handling function.
5. set up the code which creates your parser and runs it.

[source,go]
----
// The URL below gives information about the processed CSV file.
// https://www.epa.gov/smartgrowth/national-walkability-index-user-guide-and-methodology

const CSV_URL = "https://edg.epa.gov/EPADataCommons/public/OA/EPA_SmartLocationDatabase_V3_Jan_2021_Final.csv"

// A simple struct with part of the data from the CSV.
type Place struct {
	Name        string
	Population  int
	Walkability float64
	Area        float64
}

// ParsePlace is the CSV to struct parsing function.
func ParsePlace(row []string) (Place, error) {
	var err error
	var e error
	place := Place{}
	if len(row) < 117 {
		return place, fmt.Errorf("got %d columns, need 117 at least", len(row))
	}
	place.Name = row[10]
	if place.Population, e = strconv.Atoi(row[18]); e != nil {
		err = errors.Join(err, e)
	}
	if place.Walkability, e = strconv.ParseFloat(row[114], 64); e != nil {
		err = errors.Join(err, e)
	}
	if place.Area, e = strconv.ParseFloat(row[116], 64); e != nil {
		err = errors.Join(err, e)
	}
	return place, err
}

func main() {
	// create the parser, note the use of generics to insert our type.
	parser, err := bigcsv.New[Place](bigcsv.HTTPStream(CSV_URL))
	if err != nil {
		panic(err)
	}
	ctx, cancel := context.WithCancel(context.Background()) // we're going to cancel early.
	processedRows := &atomic.Int32{} // atomic Int due to parallel workers.
	parser.Parse = ParsePlace // Set the parsing function.
	parser.OnData = func(p Place) error { // data handler (demo: just log it)
		log.Printf(
			"Place '%s', pop. %d, area %.1f, walkability %.1f\n",
			p.Name, p.Population, p.Area, p.Walkability,
		)
		if processedRows.Add(1) > 100 { // cancel early, this stops the HTTP stream, too.
			cancel()
		}
		return nil
	}
	// Handle parsing / data errors.
	parser.OnError = func(err error) { // add
		log.Println(err)
	}
	// Ignore CSV headers (first line).
	headers, err := parser.Reader.Read()
	if err != nil {
		panic(err)
	}
	if len(headers) < 117 {
		panic("CSV did not have enough headers")
	}
	// Run the parser with 5 parallel workers. Note: this is for demonstration,
	// it's unlikely that workers will speed things up for HTTP streams.
	if err = parser.Run(ctx, 5); err != nil {
		panic(err)
	}
}

----

== TODO

Before this gets to v1, I'd like to change the API to be more similar to
`csv.Reader`, possibly making `FileStream` and `HTTPStream` functions which are
just wrapped `io.Reader` instances themselves.

Ideally, rather than than setting functions on the `Parser` and calling `Run`,
there would be a method `Read() (T, error)` and a more BYOP (bring your own
parallelism) attitude?

Documentation

Overview

Package bigcsv provides a simple API for processing large CSV data.

It offers loading CSV via HTTP or the filesystem with gzip detection, or directly from an io.Reader.

Index

Constants

This section is empty.

Variables

View Source
var ErrOnData = errors.New("OnData error")

ErrOnData is passed to OnError when OnData returns an error.

View Source
var ErrOnRow = errors.New("OnRow error")

ErrOnRow is passed to OnError when OnRow returns an error.

View Source
var ErrParse = errors.New("Parse error")

ErrParse is passed to OnError when Parse returns an error.

Functions

This section is empty.

Types

type FileStream

type FileStream string

FileStream provides a reader for CSV processing from the filesystem.

FileStream will automatically decompress *.gz as gzip files. Everything else will be treated as a CSV.

func (FileStream) Open

func (fs FileStream) Open() (io.ReadCloser, error)

type HTTPStream

type HTTPStream string

HTTPStream provides a reader for the CSV stream directly via HTTP(s).

func (HTTPStream) Open

func (hs HTTPStream) Open() (io.ReadCloser, error)

type Parser

type Parser[T any] struct {

	// Reader is the CSV reader which can be modified prior to processing.
	//
	// To change CSV settings, use the Reader directly after creating the Parser
	// and prior to calling Run
	Reader *csv.Reader

	// OnRow accepts a CSV row prior to parsing.
	//
	// If an error is returned, the OnError function is called and the row is
	// skipped.
	OnRow func(row []string) error

	// Parse should parse the raw row from the CSV and return the data type.
	Parse func(row []string) (T, error)

	// OnData accepts a processed CSV row as a Report.
	//
	// The return value signals whether to stop ALL further processing. Note
	// that when using multiple workers, already started workers will continue
	// to process their row until this signal is received.
	OnData func(data T) error

	// OnError handles errors arising during processing.
	//
	// If the Parse method returns an error, this method will receive it.
	// Other, errors from the underlying *csv.Reader will be passed here, too.
	OnError func(error)
	// contains filtered or unexported fields
}

Parser provides streaming CSV parsing. It must be created with New.

func New

func New[T any](stream Stream) (*Parser[T], error)

NewParser opens the given stream and starts the CSV reader.

func (*Parser[T]) Run

func (p *Parser[T]) Run(ctx context.Context, workers int) error

Run begins parsing the CSV records, invoking the configured functions.

This method will not return until all workers have finished processing.

type Stream

type Stream interface {
	Open() (io.ReadCloser, error)
}

Stream is the interface which provides a CSV file to process.

func ReadStream

func ReadStream(r io.Reader) Stream

ReadStream provides an adapter for any io.Reader.

If the reader is no also an io.Closer, it will be wrapped using io.NopCloser.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL