csvplus

package module
v0.2.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 25, 2016 License: BSD-3-Clause Imports: 9 Imported by: 1

README

#csvplus

GoDoc Go report

Package csvplus extends the standard Go encoding/csv package with fluent interface, lazy stream processing operations, indices and joins.

The library is primarily designed for ETL-like processes. It is mostly useful in places where the more advanced searching/joining capabilities of a fully-featured SQL database are not required, but the same time the data transformations needed still include SQL-like operations.

License: BSD

Examples

Simple sequential processing:

people := csvplus.CsvFileDataSource("people.csv").SelectColumns("name", "surname", "id")

err := csvplus.Take(people).
	Filter(csvplus.Like(csvplus.Row{"name": "Amelia"})).
	Map(func(row csvplus.Row) csvplus.Row { row["name"] = "Julia"; return row }).
	ToCsvFile("out.csv", "name", "surname")

if err != nil {
	return err
}

More involved example:

customers, err := csvplus.Take(
	csvplus.CsvFileDataSource("people.csv").SelectColumns("id", "name", "surname")).
	UniqueIndexOn("id")

if err != nil {
	return err
}

products, err := csvplus.Take(
	csvplus.CsvFileDataSource("stock.csv").SelectColumns("prod_id", "product", "price")).
	UniqueIndexOn("prod_id")

if err != nil {
	return err
}

orders := csvplus.CsvFileDataSource("orders.csv").SelectColumns("cust_id", "prod_id", "qty", "ts")

return csvplus.Take(orders).
	Join(customers, "cust_id").
	Join(products).
	ForEach(func(row csvplus.Row) error {
		// prints lines like:
		//	John Doe bought 38 oranges for £0.03 each on 2016-09-14T08:48:22+01:00
		_, e := fmt.Printf("%s %s bought %s %ss for £%s each on %s\n",
			row["name"], row["surname"], row["qty"], row["product"], row["price"], row["ts"])
		return e
	})

Design principles

The package functionality is based on the operations on the following entities:

  • type Row
  • interface DataSource
  • type Table
  • type Index

Type Row

Row represents one row from a DataSource. It is a map from column names to the string values under those columns on the current row. The package expects a unique name assigned to every column at source. Compared to using integer indices this provides more convenience when complex transformations are applied to each row during processing.

Interface DataSource

Interface DataSource represents any source of one or more rows, like .csv file. The only defined operation on DataSource is iteration over the rows. The iteration is performed via an implementation of ForEach method which is expected to call its parameter function once per each row. The package contains an implementation of the interface for .csv files.

Type Table

Type Table implements sequential operations on a given data source, as well as the DataSource interface itself and other iterating methods. All sequential operations are 'lazy', i.e. they are not invoked immediately, but instead they return a new table which, when iterated over, invokes the particular operation. The operations can be chained using so called fluent interface. The actual iteration over a table only happens when any of the following methods is called:

  • ForEach
  • IndexOn
  • UniqueIndexOn
  • ToCsvFile
  • ToRows

A Table can also be joined with an Index, and this operation is lazy.

Type Index

Index is a sorted collection of rows. The sorting is performed on the columns specified when the index is created. Iteration over an index yields sorted sequence of rows. An Index can be joined with a Table. The type has operations for finding rows and creating sub-indices in O(log(n)) time. Another useful operation is resolving duplicates. Building an index takes O(n*log(n)) time, but before that the entire data source gets read into the memory so certain care should be taken when indexing huge datasets.

For more details see the documentation.

Project status

The project is in a usable state usually called "beta". Tested on Linux Mint 18 (based on Ubuntu 16.04). Go version 1.7.1.

Documentation

Overview

Package csvplus extends the standard Go encoding/csv package with fluent interface, lazy stream processing operations, indices and joins.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func All

func All(funcs ...func(Row) bool) func(Row) bool

All is a predicate combinator that takes any number of other predicates and produces a new predicate which returns 'true' only if all the specified predicates return 'true' for the same input Row.

func Any

func Any(funcs ...func(Row) bool) func(Row) bool

Any is a predicate combinator that takes any number of other predicates and produces a new predicate which returns 'true' if any the specified predicates returns 'true' for the same input Row.

func Like

func Like(match Row) func(Row) bool

Like produces a predicate that returns 'true' if its input Row matches all the corresponding values from the specified 'match' Row.

func Not

func Not(pred func(Row) bool) func(Row) bool

Not produces a new predicate that reverts the return value from the given predicate.

Types

type CsvDataSource

type CsvDataSource struct {
	// contains filtered or unexported fields
}

CsvDataSource is an implementation of the DataSource interface that reads its data from a .csv file.

func CsvFileDataSource

func CsvFileDataSource(name string) *CsvDataSource

CsvFileDataSource constructs a new CsvDataSource bound to the specified file name and with the default csv.Reader settings.

func (*CsvDataSource) AssumeHeader

func (s *CsvDataSource) AssumeHeader(spec map[string]int) *CsvDataSource

AssumeHeader sets the header for those input files that do not have their column names specified on the first line of the file. The header specification is a map from assigned column names to their corresponding column indices.

func (*CsvDataSource) CommentChar

func (s *CsvDataSource) CommentChar(c rune) *CsvDataSource

CommentChar sets the symbol that starts a comment in the input file.

func (*CsvDataSource) Delimiter

func (s *CsvDataSource) Delimiter(c rune) *CsvDataSource

Delimiter sets the symbol to be used as a field delimiter in the input file.

func (*CsvDataSource) ExpectHeader

func (s *CsvDataSource) ExpectHeader(spec map[string]int) *CsvDataSource

ExpectHeader sets the header for input files that have their column names specified on the first line of the file. The line gets verified against this specification each time the input file is opened. The header specification is a map from expected column names to their corresponding column indices. A negative value for an index means that the real value of the index will be found searching the first line of the file for the specified column name.

func (*CsvDataSource) ForEach

func (s *CsvDataSource) ForEach(fn RowFunc) error

ForEach reads the input file line by line, converts each line to a Row and calls the supplied RowFunc. ForEach is goroutine-safe and may be called multiple times.

func (*CsvDataSource) LazyQuotes

func (s *CsvDataSource) LazyQuotes() *CsvDataSource

LazyQuotes specifies that a quote may appear in an unquoted field and a non-doubled quote may appear in a quoted field of the input file.

func (*CsvDataSource) NumFields

func (s *CsvDataSource) NumFields(n int) *CsvDataSource

NumFields sets the expected number of fields on each line of the input file. It is an error if any line from the input file does not have that exact number of fields.

func (*CsvDataSource) NumFieldsAny

func (s *CsvDataSource) NumFieldsAny() *CsvDataSource

NumFieldsAny specifies that each line of the input file may have different number of fields. Lines shorter than the maximum column index in the header specification will be padded with empty fields.

func (*CsvDataSource) NumFieldsAuto

func (s *CsvDataSource) NumFieldsAuto() *CsvDataSource

NumFieldsAuto specifies that the number of fields on each line must match that of the first line of the input file.

func (*CsvDataSource) SelectColumns

func (s *CsvDataSource) SelectColumns(names ...string) *CsvDataSource

SelectColumns specifies the names of the columns to read from the file. The header specification is built by searching the first line of the input file for the names specified and recording the indices of those columns. It is an error if any of the column names is not found.

func (*CsvDataSource) TrimLeadingSpace

func (s *CsvDataSource) TrimLeadingSpace() *CsvDataSource

TrimLeadingSpace specifies that the leading white space in a field should be ignored.

type DataSource

type DataSource interface {
	// ForEach should call the given RowFunc once per each Row. The iteration should
	// continue for as long as the RowFunc returns 'nil'. When RowFunc returns
	// a non-nil error, this function should stop iteration and return an error,
	// which may be either the original one, or some other error. The special
	// value of io.EOF should be treated as a 'stop iteration' command, in which
	// case this function should return 'nil' error. Given that Rows can be modified
	// by the RowFunc, the implementations should only pass copies of their
	// underlying rows to the supplied RowFunc.
	ForEach(RowFunc) error
}

DataSource is the interface to any data that can be represented as a sequence of Rows.

type DataSourceError

type DataSourceError struct {
	Name string
	Line uint64
	Err  error
}

DataSourceError is the type of the error returned from CsvDataSource.ForEach method.

func (*DataSourceError) Error

func (e *DataSourceError) Error() string

Error returns a human-readable error message string.

type Index

type Index struct {
	// contains filtered or unexported fields
}

Index is a sorted collection of Rows with O(log(n)) complexity of search on the indexed columns. Iteration over the Index yields a sequence of Rows sorted on the index.

func (*Index) Find

func (index *Index) Find(values ...string) Table

Find returns a Table of all Rows from the Index that match the specified values in the indexed columns, left to the right. The number of specified values may be less than the number of the indexed columns.

func (*Index) ForEach

func (index *Index) ForEach(fn RowFunc) (err error)

ForEach calls the supplied RowFunc once per each Row. Rows are sorted by the values of the columns specified when the Index was created.

func (*Index) ResolveDuplicates

func (index *Index) ResolveDuplicates(resolve func(rows []Row) (Row, error)) error

ResolveDuplicates calls the specified function once per each pack of duplicates with the same key. The specified function must not modify its parameter and is expected to do one of the following:

- Select and return one row from the input list. The row will be used as the only row with its key;

- Return an empty row. The entire set of rows will be ignored;

- Return an error which will be passed back to the caller of ResolveDuplicates().

func (*Index) SubIndex

func (index *Index) SubIndex(values ...string) *Index

SubIndex returns an Index containing only the rows where the values of the indexed columns match the supplied values, left to the right. The number of specified values must be less than the number of indexed columns.

type Row

type Row map[string]string

Row represents one line from a data source like .csv file.

Each Row is a map from column names to the string values under that columns on the current line. It is assumed that each column has a unique name. In a .csv file, the column names may either come from the first line of the file ("expected header"), or they can be set-up via configuration of the reader object ("assumed header").

Using meaningful column names instead of indices is usually more convenient when the columns get rearranged during the execution of the processing pipeline.

func (Row) Clone

func (row Row) Clone() Row

Clone returns a copy of the current Row.

func (Row) HasColumn

func (row Row) HasColumn(col string) (found bool)

HasColumn is a predicate returning 'true' when the specified column is present.

func (Row) Header

func (row Row) Header() []string

Header returns a slice of all column names, sorted via sort.Strings.

func (Row) SafeGetValue

func (row Row) SafeGetValue(col, subst string) string

SafeGetValue returns the value under the specified column, if present, otherwise it returns the substitution value.

func (Row) Select

func (row Row) Select(cols ...string) (Row, error)

Select takes a list of column names and returns a new Row containing only the specified columns, or an error if any column is not present.

func (Row) SelectExisting

func (row Row) SelectExisting(cols ...string) Row

SelectExisting takes a list of column names and returns a new Row containing only those columns from the list that are present in the current Row.

func (Row) SelectValues

func (row Row) SelectValues(cols ...string) ([]string, error)

SelectValues takes a list of column names and returns a slice of their corresponding values, or an error if any column is not present.

func (Row) String

func (row Row) String() string

String returns a string representation of the Row.

func (Row) ValueAsFloat64 added in v0.2.3

func (row Row) ValueAsFloat64(column string) (res float64, err error)

ValueAsFloat64 returns the value of the given column converted to floating point type, or an error. The column must be present on the row.

func (Row) ValueAsInt added in v0.2.3

func (row Row) ValueAsInt(column string) (res int, err error)

ValueAsInt returns the value of the given column converted to integer type, or an error. The column must be present on the row.

type RowFunc

type RowFunc func(Row) error

RowFunc is the function type used when iterating Rows via ForEach() method.

type Table

type Table struct {
	// contains filtered or unexported fields
}

Table implements sequential operations on a given data source as well as the DataSource interface itself and other iterating methods. All sequential operations are 'lazy', i.e. they are not invoked immediately, but instead they return a new table which, when iterated over, invokes the particular operation. The operations can be chained using so called fluent interface.

func Take

func Take(source DataSource) Table

Take converts any DataSource into a Table.

func TakeRows added in v0.2.1

func TakeRows(rows []Row) Table

TakeRows converts a slice of Rows into a Table.

func (Table) Drop

func (t Table) Drop(n int) Table

Drop specifies the number of Rows to ignore before passing the remaining rows down the pipeline.

func (Table) DropColumns added in v0.2.2

func (t Table) DropColumns(columns ...string) Table

DropColumns removes the specifies columns from each row.

func (Table) DropWhile

func (t Table) DropWhile(pred func(Row) bool) Table

DropWhile ignores all the Rows for as long as the specified predicate is true; afterwards all the remaining Rows are passed down the pipeline.

func (Table) Except added in v0.2.0

func (t Table) Except(index *Index, columns ...string) Table

Except returns a table containing all the rows not in the specified Index, unchanged. The specified columns are matched against those from the index, in the order of specification. If no columns are specified then the columns list is taken from the index.

func (Table) Filter

func (t Table) Filter(pred func(Row) bool) Table

Filter takes a predicate which, when applied to a Row, decides if that Row should be passed down for further processing. The predicate should return 'true' to pass the Row.

func (Table) ForEach

func (t Table) ForEach(fn RowFunc) error

ForEach iterates over the Table invoking all the operations in the processing pipeline, and calls the specified RowFunc on each resulting Row.

func (Table) IndexOn

func (t Table) IndexOn(columns ...string) (*Index, error)

IndexOn iterates the input source building index on the specified columns. Columns are taken from the specified list from left to the right.

func (Table) Join

func (t Table) Join(index *Index, columns ...string) Table

Join returns a Table which is a join between the current Table and the specified Index. The specified columns are matched against those from the index, in the order of specification. Empty 'columns' list yields a join on the columns from the Index (aka "natural join") which all must exist in the current Table. Each row in the resulting table contains all the columns from both the current table and the index. This is a lazy operation, the actual join is performed only when the resulting table is iterated over.

func (Table) Map

func (t Table) Map(mf func(Row) Row) Table

Map takes a function which gets applied to each Row when the source is iterated over. The function may return a modified input Row, or an entirely new Row.

func (Table) SelectColumns added in v0.2.2

func (t Table) SelectColumns(columns ...string) Table

SelectColumns leaves only the specified columns on each row. It is an error if any of those columns does not exist.

func (Table) TakeWhile

func (t Table) TakeWhile(pred func(Row) bool) Table

TakeWhile takes a predicate which gets applied to each Row upon iteration. The iteration stops when the predicate returns 'false' for the first time.

func (Table) ToCsvFile

func (t Table) ToCsvFile(fileName string, columns ...string) error

ToCsvFile iterates the input source writing the selected columns to the file with the given name, in "canonical" form with the header on the first line and with all the lines having the same number of fields, using default settings for the underlying Writer from the encoding/csv package.

func (Table) ToRows added in v0.2.1

func (t Table) ToRows() (rows []Row, err error)

ToRows iterates the Table storing the result in a slice of Rows.

func (Table) Top

func (t Table) Top(n int) Table

Top specifies the number of Rows to pass down the pipeline before stopping the iteration.

func (Table) Transform

func (t Table) Transform(trans func(Row) (Row, error)) Table

Transform is the most generic operation on a Row. It takes a function which maps a Row to another Row or returns an error. Any error returned from that function stops the iteration, otherwise the returned Row, if not empty, gets passed down to the next stage of the processing pipeline.

func (Table) UniqueIndexOn

func (t Table) UniqueIndexOn(columns ...string) (*Index, error)

UniqueIndexOn iterates the input source building unique index on the specified columns. Columns are taken from the specified list from left to the right.

func (Table) Validate

func (t Table) Validate(vf func(Row) error) Table

Validate takes a function which checks every Row upon iteration and returns an error if the validation fails. The iteration stops at the first error encountered.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL