Version: v11.0.0 Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: Jan 18, 2023 License: Apache-2.0, BSD-2-Clause, BSD-3-Clause, + 8 more Imports: 30 Imported by: 1



Package pqarrow provides the implementation for connecting Arrow directly with the Parquet implementation, allowing isolation of all the explicitly arrow related code to this package which has the interfaces for reading and writing directly to and from arrow Arrays/Tables/Records



This section is empty.


This section is empty.


func DecimalSize

func DecimalSize(precision int32) int32

DecimalSize returns the minimum number of bytes necessary to represent a decimal with the requested precision.

Taken from the Apache Impala codebase. The comments next to the return values are the maximum value that can be represented in 2's complement with the returned number of bytes

func FromParquet

FromParquet generates an arrow Schema from a provided Parquet Schema

func NewArrowWriteContext

func NewArrowWriteContext(ctx context.Context, props *ArrowWriterProperties) context.Context

NewArrowWriteContext is for creating a re-usable context object that contains writer properties and other re-usable buffers for writing. The resulting context should not be used to write multiple columns concurrently. If nil is passed, then DefaultWriterProps will be used.

func ReadTable

ReadTable is a convenience function to quickly and easily read a parquet file into an arrow table.

The schema of the arrow table is generated based on the schema of the parquet file, including nested columns/lists/etc. in the same fashion as the FromParquetSchema function. This just encapsulates the logic of creating a separate file.Reader and pqarrow.FileReader to make a single easy function when you just want to construct a table from the entire parquet file rather than reading it piecemeal.

func ToParquet

func ToParquet(sc *arrow.Schema, props *parquet.WriterProperties, arrprops ArrowWriterProperties) (*schema.Schema, error)

ToParquet generates a Parquet Schema from an arrow Schema using the given properties to make decisions when determining the logical/physical types of the columns.

func WriteArrowToColumn

func WriteArrowToColumn(ctx context.Context, cw file.ColumnChunkWriter, leafArr arrow.Array, defLevels, repLevels []int16, leafFieldNullable bool) error

WriteArrowToColumn writes apache arrow columnar data directly to a ColumnWriter. Returns non-nil error if the array data type is not compatible with the concrete writer type.

leafArr is always a primitive (possibly dictionary encoded type). Leaf_field_nullable indicates whether the leaf array is considered nullable according to its schema in a Table or its parent array.

func WriteTable

func WriteTable(tbl arrow.Table, w io.Writer, chunkSize int64, props *parquet.WriterProperties, arrprops ArrowWriterProperties) error

WriteTable is a convenience function to create and write a full array.Table to a parquet file. The schema and columns will be determined by the schema of the table, writing the file out to the the provided writer. The chunksize will be utilized in order to determine the size of the row groups.


type ArrowColumnWriter

type ArrowColumnWriter struct {
	// contains filtered or unexported fields

ArrowColumnWriter is a convenience object for easily writing arrow data to a specific set of columns in a parquet file. Since a single arrow array can itself be a nested type consisting of multiple columns of data, this will write to all of the appropriate leaves in the parquet file, allowing easy writing of nested columns.

func NewArrowColumnWriter

func NewArrowColumnWriter(data *arrow.Chunked, offset, size int64, manifest *SchemaManifest, rgw file.RowGroupWriter, col int) (ArrowColumnWriter, error)

NewArrowColumnWriter returns a new writer using the chunked array to determine the number of leaf columns, and the provided schema manifest to determine the paths for writing the columns.

Using an arrow column writer is a convenience to avoid having to process the arrow array yourself and determine the correct definition and repetition levels manually.

func (*ArrowColumnWriter) Write

func (acw *ArrowColumnWriter) Write(ctx context.Context) error

type ArrowReadProperties

type ArrowReadProperties struct {
	// If Parallel is true, then functions which read multiple columns will read
	// those columns in parallel from the file with a number of readers equal
	// to the number of columns. Otherwise columns are read serially.
	Parallel bool
	// BatchSize is the size used for calls to NextBatch when reading whole columns
	BatchSize int64

ArrowReadProperties is the properties to define how to read a parquet file into arrow arrays.

type ArrowWriterProperties

type ArrowWriterProperties struct {
	// contains filtered or unexported fields

ArrowWriterProperties are used to determine how to manipulate the arrow data when writing it to a parquet file.

func DefaultWriterProps

func DefaultWriterProps() ArrowWriterProperties

DefaultWriterProps returns the default properties for the arrow writer, which are to use memory.DefaultAllocator and coerceTimestampUnit: arrow.Second.

func NewArrowWriterProperties

func NewArrowWriterProperties(opts ...WriterOption) ArrowWriterProperties

NewArrowWriterProperties creates a new writer properties object by passing in a set of options to control the properties. Once created, an individual instance of ArrowWriterProperties is immutable.

type ColumnChunkReader

type ColumnChunkReader struct {
	// contains filtered or unexported fields

ColumnChunkReader is a reader that reads only a single column chunk from a single column in a single row group

func (ColumnChunkReader) Read

func (ccr ColumnChunkReader) Read(ctx context.Context) (*arrow.Chunked, error)

type ColumnReader

type ColumnReader struct {
	// contains filtered or unexported fields

ColumnReader is used for reading batches of data from a specific column across multiple row groups to return a chunked arrow array.

func (*ColumnReader) NextBatch

func (c *ColumnReader) NextBatch(size int64) (*arrow.Chunked, error)

NextBatch returns a chunked array after reading `size` values, potentially across multiple row groups.

type FileReader

type FileReader struct {
	Props    ArrowReadProperties
	Manifest *SchemaManifest
	// contains filtered or unexported fields

FileReader is the base object for reading a parquet file into arrow object types.

It provides utility functions for reading record batches, a table, subsets of columns / rowgroups, and so on.

func NewFileReader

func NewFileReader(rdr *file.Reader, props ArrowReadProperties, mem memory.Allocator) (*FileReader, error)

NewFileReader constructs a reader for converting to Arrow objects from an existing parquet file reader object.

Only returns an error if there is some error constructing the schema manifest from the parquet file metadata.

func (*FileReader) GetColumn

func (fr *FileReader) GetColumn(ctx context.Context, i int) (*ColumnReader, error)

GetColumn returns a reader for pulling the data of leaf column index i across all row groups in the file.

func (*FileReader) GetFieldReader

func (fr *FileReader) GetFieldReader(ctx context.Context, i int, includedLeaves map[int]bool, rowGroups []int) (*ColumnReader, error)

GetFieldReader returns a reader for the entire Field of index i which could potentially include reading multiple columns from the underlying parquet file if that field is a nested field.

IncludedLeaves and RowGroups are used to specify precisely which leaf indexes and row groups to read a subset of.

func (*FileReader) GetFieldReaders

func (fr *FileReader) GetFieldReaders(ctx context.Context, colIndices, rowGroups []int) ([]*ColumnReader, *arrow.Schema, error)

GetFieldReaders is for retrieving readers for multiple fields at one time for only the list of column indexes and rowgroups requested. It returns a slice of the readers and the corresponding arrow.Schema for those columns.

func (*FileReader) GetRecordReader

func (fr *FileReader) GetRecordReader(ctx context.Context, colIndices, rowGroups []int) (RecordReader, error)

GetRecordReader returns a record reader that reads only the requested column indexes and row groups.

For both cases, if you pass nil for column indexes or rowgroups it will default to reading all of them.

func (*FileReader) ParquetReader

func (fr *FileReader) ParquetReader() *file.Reader

ParquetReader returns the underlying parquet file reader that it was constructed with

func (*FileReader) ReadColumn

func (fr *FileReader) ReadColumn(rowGroups []int, rdr *ColumnReader) (*arrow.Chunked, error)

ReadColumn reads data to create a chunked array only from the requested row groups.

func (*FileReader) ReadRowGroups

func (fr *FileReader) ReadRowGroups(ctx context.Context, indices, rowGroups []int) (arrow.Table, error)

ReadRowGroups is for generating an array.Table from the file but filtering to only read the requested columns and row groups rather than the entire file which ReadTable does.

func (*FileReader) ReadTable

func (fr *FileReader) ReadTable(ctx context.Context) (arrow.Table, error)

ReadTable reads the entire file into an array.Table

func (*FileReader) RowGroup

func (fr *FileReader) RowGroup(idx int) RowGroupReader

RowGroup creates a reader that will *only* read from the requested row group

func (*FileReader) Schema

func (fr *FileReader) Schema() (*arrow.Schema, error)

Schema returns the arrow schema representation of the underlying file's schema.

type FileWriter

type FileWriter struct {
	// contains filtered or unexported fields

FileWriter is an object for writing Arrow directly to a parquet file.

func NewFileWriter

func NewFileWriter(arrschema *arrow.Schema, w io.Writer, props *parquet.WriterProperties, arrprops ArrowWriterProperties) (*FileWriter, error)

NewFileWriter returns a writer for writing Arrow directly to a parquetfile, rather than the ArrowColumnWriter and WriteArrow functions which allow writing arrow to an existing file.Writer, this will create a new file.Writer based on the schema provided.

func (*FileWriter) Close

func (fw *FileWriter) Close() error

Close flushes out the data and closes the file. It can be called multiple times, subsequent calls after the first will have no effect.

func (*FileWriter) NewBufferedRowGroup

func (fw *FileWriter) NewBufferedRowGroup()

NewBufferedRowGroup starts a new memory Buffered Row Group to allow writing columns / records without immediately flushing them to disk. This allows using WriteBuffered to write records and decide where to break your rowgroup based on the TotalBytesWritten rather than on the max row group len. If using Records, this should be paired with WriteBuffered, while Write will always write a new record as a row group in and of itself.

func (*FileWriter) NewRowGroup

func (fw *FileWriter) NewRowGroup()

NewRowGroup does what it says on the tin, creates a new row group in the underlying file. Equivalent to `AppendRowGroup` on a file.Writer

func (*FileWriter) RowGroupTotalBytesWritten

func (fw *FileWriter) RowGroupTotalBytesWritten() int64

RowGroupTotalBytesWritten returns the total number of bytes written and flushed out in the current row group.

func (*FileWriter) RowGroupTotalCompressedBytes

func (fw *FileWriter) RowGroupTotalCompressedBytes() int64

RowGroupTotalCompressedBytes returns the total number of bytes after compression that have been written to the current row group so far.

func (*FileWriter) Write

func (fw *FileWriter) Write(rec arrow.Record) error

Write an arrow Record Batch to the file, respecting the MaxRowGroupLength in the writer properties to determine whether or not a new row group is created while writing.

func (*FileWriter) WriteBuffered

func (fw *FileWriter) WriteBuffered(rec arrow.Record) error

func (*FileWriter) WriteColumnChunked

func (fw *FileWriter) WriteColumnChunked(data *arrow.Chunked, offset, size int64) error

WriteColumnChunked will write the data provided to the underlying file, using the provided offset and size to allow writing subsets of data from the chunked column. It uses the current column in the underlying row group writer as the starting point, allowing progressive building of writing columns to a file via arrow data without needing to already have a record or table.

func (*FileWriter) WriteColumnData

func (fw *FileWriter) WriteColumnData(data arrow.Array) error

WriteColumnData writes the entire array to the file as the next columns. Like WriteColumnChunked it is based on the current column of the row group writer allowing progressive building of the file by columns without needing a full record or table to write.

func (*FileWriter) WriteTable

func (fw *FileWriter) WriteTable(tbl arrow.Table, chunkSize int64) error

WriteTable writes an arrow table to the underlying file using chunkSize to determine the size to break at for making row groups. Writing a table will always create a new row group for each chunk of chunkSize rows in the table. Calling this with 0 rows will still write a 0 length Row Group to the file.

type RecordReader

type RecordReader interface {

RecordReader is a Record Batch Reader that meets the interfaces for both array.RecordReader and arrio.Reader to allow easy progressive reading of record batches from the parquet file. Ideal for streaming.

type RowGroupReader

type RowGroupReader struct {
	// contains filtered or unexported fields

RowGroupReader is a reader for getting data only from a single row group of the file rather than having to repeatedly pass the index to functions on the reader.

func (RowGroupReader) Column

func (rgr RowGroupReader) Column(idx int) ColumnChunkReader

Column creates a reader for just the requested column chunk in only this row group.

func (RowGroupReader) ReadTable

func (rgr RowGroupReader) ReadTable(ctx context.Context, colIndices []int) (arrow.Table, error)

ReadTable provides an array.Table consisting only of the columns requested for this rowgroup

type SchemaField

type SchemaField struct {
	Field     *arrow.Field
	Children  []SchemaField
	ColIndex  int
	LevelInfo file.LevelInfo

SchemaField is a holder that defines a specific logical field in the schema which could potentially refer to multiple physical columns in the underlying parquet file if it is a nested type.

ColIndex is only populated (not -1) when it is a leaf column.

func (*SchemaField) IsLeaf

func (s *SchemaField) IsLeaf() bool

IsLeaf returns true if the SchemaField is a leaf column, ie: ColIndex != -1

type SchemaManifest

type SchemaManifest struct {
	OriginSchema *arrow.Schema
	SchemaMeta   *arrow.Metadata

	ColIndexToField map[int]*SchemaField
	ChildToParent   map[*SchemaField]*SchemaField
	Fields          []SchemaField
	// contains filtered or unexported fields

SchemaManifest represents a full manifest for mapping a Parquet schema to an arrow Schema.

func NewSchemaManifest

func NewSchemaManifest(sc *schema.Schema, meta metadata.KeyValueMetadata, props *ArrowReadProperties) (*SchemaManifest, error)

NewSchemaManifest creates a manifest for mapping a parquet schema to a given arrow schema.

The metadata passed in should be the file level key value metadata from the parquet file or nil. If the ARROW:schema was in the metadata, then it is utilized to determine types.

func (*SchemaManifest) GetColumnField

func (sm *SchemaManifest) GetColumnField(index int) (*SchemaField, error)

GetColumnField returns the corresponding Field for a given column index.

func (*SchemaManifest) GetFieldIndices

func (sm *SchemaManifest) GetFieldIndices(indices []int) ([]int, error)

GetFieldIndices coalesces a list of field indices (relative to the equivalent arrow::Schema) which correspond to the column root (first node below the parquet schema's root group) of each leaf referenced in column_indices.

For example, for leaves `a.b.c`, `a.b.d.e`, and `i.j.k` (column_indices=[0,1,3]) the roots are `a` and `i` (return=[0,2]).

root -- a <------ -- -- b | | -- -- -- c | -- -- -- d | -- -- -- -- e -- f -- -- g -- -- -- h -- i <--- -- -- j | -- -- -- k

func (*SchemaManifest) GetParent

func (sm *SchemaManifest) GetParent(field *SchemaField) *SchemaField

GetParent gets the parent field for a given field if it is a nested column, otherwise returns nil if there is no parent field.

type WriterOption

type WriterOption func(*config)

WriterOption is a convenience for building up arrow writer properties

func WithAllocator

func WithAllocator(mem memory.Allocator) WriterOption

WithAllocator specifies the allocator to be used by the writer whenever allocating buffers and memory.

func WithCoerceTimestamps

func WithCoerceTimestamps(unit arrow.TimeUnit) WriterOption

WithCoerceTimestamps enables coercing of timestamp units to a specific time unit when constructing the schema and writing data so that regardless of the unit used by the datatypes being written, they will be converted to the desired time unit.

func WithDeprecatedInt96Timestamps

func WithDeprecatedInt96Timestamps(enabled bool) WriterOption

WithDeprecatedInt96Timestamps allows specifying to enable conversion of arrow timestamps to int96 columns when constructing the schema. Since int96 is the impala standard, it's technically deprecated in terms of parquet files but is sometimes needed.

func WithNoMapLogicalType

func WithNoMapLogicalType() WriterOption

func WithStoreSchema

func WithStoreSchema() WriterOption

WithStoreSchema enables writing a binary serialized arrow schema to the file in metadata to enable certain read options (like "read_dictionary") to be set automatically

If called, the arrow schema is serialized and base64 encoded before being added to the metadata of the parquet file with the key "ARROW:schema". If the key exists when opening a file for read with pqarrow.FileReader, the schema will be used to choose types and options when constructing the arrow schema of the resulting data.

func WithTruncatedTimestamps

func WithTruncatedTimestamps(allow bool) WriterOption

WithTruncatedTimestamps called with true turns off the error that would be returned if coercing a timestamp unit would cause a loss of data such as converting from nanoseconds to seconds.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL