transform

package
v0.0.0-...-594af83 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 21, 2025 License: Apache-2.0 Imports: 6 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type CSVDecoderOptions

type CSVDecoderOptions struct {
	Comma  rune
	Header []string
}

CSVDecoderOptions configures how a CSVDecoder parses input.

If Header is non-nil and non-empty, it is used as the canonical header and every record must match its length. If Header is nil or empty, the decoder will read the first CSV record from the stream and treat it as the header.

Comma controls the field delimiter. If Comma is zero, ',' is used.

type Decoder

type Decoder interface {

	// Decode consumes bytes from rc and produces a RecordIterator. The
	// returned iterator owns rc and is responsible for closing it when
	// iteration ends or Close is called.
	//
	// The rc parameter exposes both io.Reader and source metadata via
	// connector.SrcAwareStreamer, allowing the decoder to maintain
	// per-record provenance (file name, byte offset, source boundaries).
	Decode(ctx context.Context, rc connector.SrcAwareStreamer) (RecordIterator, error)
}

Decoder turns a source-aware byte stream into a stream of decoded records.

A Decoder is responsible for a specific on-wire format (e.g., CSV, XML). Any format-specific configuration (delimiter, header handling, namespaces, etc.) should be supplied when constructing the Decoder, not at Decode time.

func NewCSVDecoder

func NewCSVDecoder(opt CSVDecoderOptions) Decoder

NewCSVDecoder constructs a CSV-specific Decoder.

The returned Decoder produces RecordIterator values backed by encoding/csv.Reader and understands a header row:

  • If opt.Header is non-empty, that slice is used as the header.
  • Otherwise, the header is inferred from the first record in the stream.

When used together with a connector.SrcAwareStreamer that concatenates multiple CSV sources, NewCSVDecoder will:

  • Enforce a single canonical header for all sources.
  • Automatically drop a repeated header row at the start of each new source when the inferred header matches.

Input is expected to follow the CSV rules described in RFC 4180 (with a configurable delimiter).

type Extractor

type Extractor interface {
	// ByIndex returns the field value at index i and true if present.
	// Implementations must return ok == false for out-of-bounds indices.
	ByIndex(i int) (string, bool)

	// ByName returns the field value for the given name and true if present.
	// If the underlying format does not provide names (no header), ByName
	// must return ok == false.
	ByName(name string) (string, bool)

	// Len reports number of fields in the current record.
	Len() int

	// Names returns the field names for the current record if available, or
	// nil if the format has no header or the decoder is not name-aware.
	Names() []string

	// Meta returns the source metadata for the current record, such as the
	// originating file name and byte offset, as provided by the underlying
	// connector.SrcAwareStreamer.
	Meta() connector.SrcMeta
}

type Mapper

type Mapper[T any] func(Extractor) (T, error)

Mapper converts a single decoded record into a strongly typed value T.

Mappers are typically small, schema-specific functions that pull fields out of the Extractor (by index and/or by name), perform validation and type conversion, and return either a T or an error.

type RecordIterator

type RecordIterator interface {

	// Next advances to the next record and reports whether one is available.
	// It returns false on EOF or on a terminal error. When Next returns
	// false, Err must be checked to distinguish clean EOF from failure.
	Next() bool

	// Record returns the current record. It is only valid to call Record
	// after Next has returned true, and its result remains valid until the
	// next call to Next.
	Record() Extractor

	// Err returns the first non-EOF error encountered while iterating, or
	// nil if the iterator completed successfully.
	Err() error

	// Close releases any underlying resources. It must be safe to call
	// Close multiple times. Implementations should tolerate Close being
	// called before the iterator is fully exhausted.
	Close() error
}

RecordIterator is a forward-only iterator over decoded records.

The typical usage pattern is:

it, err := dec.Decode(ctx, stream)
if err != nil { ... }
defer it.Close()

for it.Next() {
    rec := it.Record()
    // use rec.ByIndex / rec.ByName / rec.Meta ...
}
if err := it.Err(); err != nil {
    // handle stream/decoder error
}

type StructIterator

type StructIterator[T any] interface {

	// Next advances to the next value and reports whether one is available.
	// It returns false on EOF or on a terminal error. When Next returns
	// false, Err must be checked to distinguish clean EOF from failure.
	Next() bool

	// Struct returns the current value of type T. It is only valid to call
	// Struct after Next has returned true, and its result remains valid
	// until the next call to Next.
	Struct() T

	// Err returns the first non-EOF error encountered while iterating, or
	// nil if the iterator completed successfully.
	Err() error

	// Close releases any underlying resources. It must be safe to call
	// Close multiple times. Implementations should tolerate Close being
	// called before the iterator is fully exhausted.
	Close() error
}

StructIterator is a forward-only iterator over strongly typed values produced by applying a Mapper to each decoded record.

type Transformer

type Transformer[T any] interface {

	// Transform decodes records from rc using the embedded Decoder, applies
	// mapFn to each record to produce a T, and returns a StructIterator
	// over the resulting values.
	//
	// The returned iterator owns rc and is responsible for closing it when
	// iteration ends or Close is called.
	Transform(ctx context.Context, rc connector.SrcAwareStreamer, mapFn Mapper[T]) (StructIterator[T], error)
}

Transformer composes a Decoder with a Mapper to produce a stream of strongly-typed values T from a source-aware byte stream.

func NewDecodeMapTransform

func NewDecodeMapTransform[T any](decoder Decoder) Transformer[T]

NewDecodeMapTransform constructs a Transformer[T] that uses the provided Decoder to turn a SrcAwareStreamer into decoded records, and then applies a Mapper[T] to each record to produce strongly-typed values.

The Decoder is typically a CSVDecoder, XMLDecoder, etc.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL