Documentation ¶
Index ¶
- Constants
- Variables
- func DefaultIndexConfig() []*index.LevelConfig
- func LoadSnapshot(ctx context.Context, db *DB, tx uint64, r io.ReaderAt, size int64, dir string, ...) (uint64, error)
- func NewTableConfig(schema proto.Message, options ...TableOption) *tablepb.TableConfig
- func SnapshotDir(db *DB, tx uint64) string
- func StoreSnapshot(ctx context.Context, tx uint64, db *DB, snapshot io.Reader) error
- func WriteSnapshot(ctx context.Context, tx uint64, db *DB, w io.Writer) error
- type CloseOption
- type Closer
- type ColumnStore
- func (s *ColumnStore) Close() error
- func (s *ColumnStore) DB(ctx context.Context, name string, opts ...DBOption) (*DB, error)
- func (s *ColumnStore) DBs() []string
- func (s *ColumnStore) DatabasesDir() string
- func (s *ColumnStore) DropDB(name string) error
- func (s *ColumnStore) GetDB(name string) (*DB, error)
- type DB
- func (db *DB) Close(options ...CloseOption) error
- func (db *DB) GetTable(name string) (*Table, error)
- func (db *DB) HighWatermark() uint64
- func (db *DB) Snapshot(ctx context.Context) error
- func (db *DB) Table(name string, config *tablepb.TableConfig) (*Table, error)
- func (db *DB) TableNames() []string
- func (db *DB) TableProvider() *DBTableProvider
- func (db *DB) Wait(tx uint64)
- type DBOption
- type DBTableProvider
- type DataSink
- type DataSinkSource
- type DataSource
- type DefaultObjstoreBucket
- func (b *DefaultObjstoreBucket) Prefixes(ctx context.Context, prefix string) ([]string, error)
- func (b *DefaultObjstoreBucket) ProcessFile(ctx context.Context, blockDir string, lastBlockTimestamp uint64, ...) error
- func (b *DefaultObjstoreBucket) Scan(ctx context.Context, prefix string, _ *dynparquet.Schema, ...) error
- func (b *DefaultObjstoreBucket) String() string
- type DefaultObjstoreBucketOption
- type ErrCreateSchemaWriter
- type ErrReadRow
- type ErrTableNotFound
- type ErrWriteRow
- type GenericTable
- type Option
- func WithActiveMemorySize(size int64) Option
- func WithCompactionAfterRecovery(tableNames []string) Option
- func WithIndexConfig(indexConfig []*index.LevelConfig) Option
- func WithIndexDegree(indexDegree int) Option
- func WithLogger(logger log.Logger) Option
- func WithManualBlockRotation() Option
- func WithReadOnlyStorage(ds DataSource) Option
- func WithReadWriteStorage(ds DataSinkSource) Option
- func WithRecoveryConcurrency(concurrency int) Option
- func WithRegistry(reg prometheus.Registerer) Option
- func WithSnapshotTriggerSize(size int64) Option
- func WithSplitSize(size int) Option
- func WithStoragePath(path string) Option
- func WithTestingOptions(opts ...TestingOption) Option
- func WithTracer(tracer trace.Tracer) Option
- func WithWAL() Option
- func WithWriteOnlyStorage(ds DataSink) Option
- type ParquetWriter
- type RotateBlockOption
- type Sync
- type Table
- func (t *Table) ActiveBlock() *TableBlock
- func (t *Table) ActiveWriteBlock() (*TableBlock, func(), error)
- func (t *Table) EnsureCompaction() error
- func (t *Table) IndexConfig() []*index.LevelConfig
- func (t *Table) InsertRecord(ctx context.Context, record arrow.Record) (uint64, error)
- func (t *Table) Iterator(ctx context.Context, tx uint64, pool memory.Allocator, ...) error
- func (t *Table) RotateBlock(_ context.Context, block *TableBlock, opts ...RotateBlockOption) error
- func (t *Table) Schema() *dynparquet.Schema
- func (t *Table) SchemaIterator(ctx context.Context, tx uint64, pool memory.Allocator, ...) error
- func (t *Table) View(ctx context.Context, fn func(ctx context.Context, tx uint64) error) error
- type TableBlock
- func (t *TableBlock) EnsureCompaction() error
- func (t *TableBlock) Index() *index.LSM
- func (t *TableBlock) InsertRecord(_ context.Context, tx uint64, record arrow.Record) error
- func (t *TableBlock) Persist() error
- func (t *TableBlock) Serialize(writer io.Writer) error
- func (t *TableBlock) Size() int64
- type TableOption
- type TestingOption
- type TxNode
- type TxPool
- type WAL
Constants ¶
const ( B = 1 KiB = 1024 * B MiB = 1024 * KiB GiB = 1024 * MiB TiB = 1024 * GiB )
const DefaultBlockReaderLimit = 10
DefaultBlockReaderLimit is the concurrency limit for reading blocks.
Variables ¶
var ( ErrNoSchema = fmt.Errorf("no schema") ErrTableClosing = fmt.Errorf("table closing") )
Functions ¶
func DefaultIndexConfig ¶
func DefaultIndexConfig() []*index.LevelConfig
DefaultIndexConfig returns the default level configs used. This is a function So that any modifications to the result will not affect the default config.
func LoadSnapshot ¶
func NewTableConfig ¶
func NewTableConfig( schema proto.Message, options ...TableOption, ) *tablepb.TableConfig
func SnapshotDir ¶
func StoreSnapshot ¶
Types ¶
type CloseOption ¶
type CloseOption func(*closeOptions)
func WithClearStorage ¶
func WithClearStorage() CloseOption
type ColumnStore ¶
type ColumnStore struct {
// contains filtered or unexported fields
}
func New ¶
func New( options ...Option, ) (*ColumnStore, error)
func (*ColumnStore) Close ¶
func (s *ColumnStore) Close() error
Close persists all data from the columnstore to storage. It is no longer valid to use the coumnstore for reads or writes, and the object should not longer be reused.
func (*ColumnStore) DB ¶
DB gets or creates a database on the given ColumnStore with the given options. Note that if the database already exists, the options will be applied cumulatively to the database.
func (*ColumnStore) DBs ¶
func (s *ColumnStore) DBs() []string
DBs returns all the DB names of this column store.
func (*ColumnStore) DatabasesDir ¶
func (s *ColumnStore) DatabasesDir() string
func (*ColumnStore) DropDB ¶
func (s *ColumnStore) DropDB(name string) error
type DB ¶
type DB struct {
// contains filtered or unexported fields
}
func (*DB) Close ¶
func (db *DB) Close(options ...CloseOption) error
func (*DB) HighWatermark ¶
HighWatermark returns the current high watermark.
func (*DB) Snapshot ¶
Snapshot performs a database snapshot and writes it to the database snapshots directory, as is done by automatic snapshots.
func (*DB) Table ¶
Table will get or create a new table with the given name and config. If a table already exists with the given name, it will have it's configuration updated.
func (*DB) TableNames ¶
TableNames returns the names of all the db's tables.
func (*DB) TableProvider ¶
func (db *DB) TableProvider() *DBTableProvider
type DBOption ¶
func WithCompactionAfterOpen ¶
type DBTableProvider ¶
type DBTableProvider struct {
// contains filtered or unexported fields
}
func NewDBTableProvider ¶
func NewDBTableProvider(db *DB) *DBTableProvider
func (*DBTableProvider) GetTable ¶
func (p *DBTableProvider) GetTable(name string) (logicalplan.TableReader, error)
type DataSink ¶
type DataSink interface { fmt.Stringer Upload(ctx context.Context, name string, r io.Reader) error Delete(ctx context.Context, name string) error }
DataSink is a remote destination for data.
type DataSinkSource ¶
type DataSinkSource interface { DataSink DataSource }
DataSinkSource is a convenience interface for a data source and sink.
type DataSource ¶
type DataSource interface { fmt.Stringer Scan(ctx context.Context, prefix string, schema *dynparquet.Schema, filter logicalplan.Expr, lastBlockTimestamp uint64, callback func(context.Context, any) error) error Prefixes(ctx context.Context, prefix string) ([]string, error) }
DataSource is remote source of data that can be queried.
type DefaultObjstoreBucket ¶
DefaultObjstoreBucket is the default implementation of the DataSource and DataSink interface.
func NewDefaultBucket ¶
func NewDefaultBucket(b storage.Bucket, options ...DefaultObjstoreBucketOption) *DefaultObjstoreBucket
func NewDefaultObjstoreBucket ¶
func NewDefaultObjstoreBucket(b objstore.Bucket, options ...DefaultObjstoreBucketOption) *DefaultObjstoreBucket
func (*DefaultObjstoreBucket) ProcessFile ¶
func (b *DefaultObjstoreBucket) ProcessFile(ctx context.Context, blockDir string, lastBlockTimestamp uint64, filter expr.TrueNegativeFilter, callback func(context.Context, any) error) error
ProcessFile will process a bucket block parquet file.
func (*DefaultObjstoreBucket) Scan ¶
func (b *DefaultObjstoreBucket) Scan(ctx context.Context, prefix string, _ *dynparquet.Schema, filter logicalplan.Expr, lastBlockTimestamp uint64, callback func(context.Context, any) error) error
func (*DefaultObjstoreBucket) String ¶
func (b *DefaultObjstoreBucket) String() string
type DefaultObjstoreBucketOption ¶
type DefaultObjstoreBucketOption func(*DefaultObjstoreBucket)
func StorageWithBlockReaderLimit ¶
func StorageWithBlockReaderLimit(limit int) DefaultObjstoreBucketOption
func StorageWithLogger ¶
func StorageWithLogger(logger log.Logger) DefaultObjstoreBucketOption
func StorageWithTracer ¶
func StorageWithTracer(tracer trace.Tracer) DefaultObjstoreBucketOption
type ErrCreateSchemaWriter ¶
type ErrCreateSchemaWriter struct {
// contains filtered or unexported fields
}
func (ErrCreateSchemaWriter) Error ¶
func (e ErrCreateSchemaWriter) Error() string
type ErrReadRow ¶
type ErrReadRow struct {
// contains filtered or unexported fields
}
func (ErrReadRow) Error ¶
func (e ErrReadRow) Error() string
type ErrTableNotFound ¶
type ErrTableNotFound struct {
TableName string
}
func (ErrTableNotFound) Error ¶
func (e ErrTableNotFound) Error() string
type ErrWriteRow ¶
type ErrWriteRow struct {
// contains filtered or unexported fields
}
func (ErrWriteRow) Error ¶
func (e ErrWriteRow) Error() string
type GenericTable ¶
GenericTable is a wrapper around *Table that writes structs of type T. It consist of a generic arrow.Record builder that ingests structs of type T. The generated record is then passed to (*Table).InsertRecord.
Struct tag `frostdb` is used to pass options for the schema for T.
This api is opinionated.
- Nested Columns are not supported
Tags ¶
Use `frostdb` to define tags that customizes field values. You can express everything needed to construct schema v1alpha1.
Tags are defined as a comma separated list. The first item is the column name. Column name is optional, when omitted it is derived from the field name (snake_cased)
Supported Tags
delta_binary_packed | Delta binary packed encoding. brotli | Brotli compression. asc | Sorts in ascending order.Use asc(n) where n is an integer for sorting order. gzip | GZIP compression. snappy | Snappy compression. delta_length_byte_array | Delta Length Byte Array encoding. delta_byte_array | Delta Byte Array encoding. desc | Sorts in descending order.Use desc(n) where n is an integer for sorting order lz4_raw | LZ4_RAW compression. pre_hash | Prehash the column before storing it. null_first | When used wit asc nulls are smallest and with des nulls are largest. zstd | ZSTD compression. rle_dict | Dictionary run-length encoding. plain | Plain encoding.
Example tagged Sample struct
type Sample struct { ExampleType string `frostdb:"example_type,rle_dict,asc(0)"` Labels []Label `frostdb:"labels,rle_dict,null,dyn,asc(1),null_first"` Stacktrace []uuid.UUID `frostdb:"stacktrace,rle_dict,asc(3),null_first"` Timestamp int64 `frostdb:"timestamp,asc(2)"` Value int64 `frostdb:"value"` }
Dynamic columns ¶
Field of type map<string, T> is a dynamic column by default.
type Example struct { // Use supported tags to customize the column value Labels map[string]string `frostdb:"labels"` }
Repeated columns ¶
Fields of type []int64, []float64, []bool, and []string are supported. These are represented as arrow.LIST.
Generated schema for the repeated columns applies all supported tags. By default repeated fields are nullable. You can safely pass nil slices for repeated columns.
func NewGenericTable ¶
func NewGenericTable[T any](db *DB, name string, mem memory.Allocator, options ...TableOption) (*GenericTable[T], error)
func (*GenericTable[T]) Release ¶
func (t *GenericTable[T]) Release()
type Option ¶
type Option func(*ColumnStore) error
func WithActiveMemorySize ¶
func WithIndexConfig ¶
func WithIndexConfig(indexConfig []*index.LevelConfig) Option
func WithIndexDegree ¶
func WithLogger ¶
func WithManualBlockRotation ¶
func WithManualBlockRotation() Option
func WithReadOnlyStorage ¶
func WithReadOnlyStorage(ds DataSource) Option
func WithReadWriteStorage ¶
func WithReadWriteStorage(ds DataSinkSource) Option
func WithRecoveryConcurrency ¶
WithRecoveryConcurrency limits the number of databases that are recovered simultaneously when calling frostdb.New. This helps limit memory usage on recovery.
func WithRegistry ¶
func WithRegistry(reg prometheus.Registerer) Option
func WithSnapshotTriggerSize ¶
WithSnapshotTriggerSize specifies a size in bytes of uncompressed inserts that will trigger a snapshot of the whole database. This can be larger than the active memory size given that the active memory size tracks the size of *compressed* data, while snapshots are triggered based on the *uncompressed* data inserted into the database. The reason this choice was made is that if a database instance crashes, it is forced to reread all uncompressed inserts since the last snapshot from the WAL, which could potentially lead to unrecoverable OOMs on startup. Defining the snapshot trigger in terms of uncompressed bytes limits the memory usage on recovery to at most the snapshot trigger size (as long as snapshots were successful). If 0, snapshots are disabled. Note that snapshots (if enabled) are also triggered on block rotation of any database table. Snapshots are complementary to the WAL and will also be disabled if the WAL is disabled.
func WithSplitSize ¶
func WithStoragePath ¶
func WithTestingOptions ¶
func WithTestingOptions(opts ...TestingOption) Option
func WithTracer ¶
func WithWriteOnlyStorage ¶
type ParquetWriter ¶
type RotateBlockOption ¶
type RotateBlockOption func(*rotateBlockOptions)
func WithRotateBlockSkipPersist ¶
func WithRotateBlockSkipPersist() RotateBlockOption
WithRotateBlockSkipPersist instructs the block rotation operation to not persist the block to object storage.
func WithRotateBlockWaitGroup ¶
func WithRotateBlockWaitGroup(wg *sync.WaitGroup) RotateBlockOption
WithRotateBlockWaitGroup provides a WaitGroup. The rotate block operation will call wg.Done once the block has been persisted. Otherwise, RotateBlock asynchronously persists the block.
type Table ¶
type Table struct {
// contains filtered or unexported fields
}
func (*Table) ActiveBlock ¶
func (t *Table) ActiveBlock() *TableBlock
func (*Table) ActiveWriteBlock ¶
func (t *Table) ActiveWriteBlock() (*TableBlock, func(), error)
func (*Table) EnsureCompaction ¶
func (*Table) IndexConfig ¶
func (t *Table) IndexConfig() []*index.LevelConfig
IndexConfig returns the index configuration for the table. It makes a copy of the column store index config and injects it's compactParts method.
func (*Table) InsertRecord ¶
func (*Table) Iterator ¶
func (t *Table) Iterator( ctx context.Context, tx uint64, pool memory.Allocator, callbacks []logicalplan.Callback, options ...logicalplan.Option, ) error
Iterator iterates in order over all granules in the table. It stops iterating when the iterator function returns false.
func (*Table) RotateBlock ¶
func (t *Table) RotateBlock(_ context.Context, block *TableBlock, opts ...RotateBlockOption) error
func (*Table) Schema ¶
func (t *Table) Schema() *dynparquet.Schema
func (*Table) SchemaIterator ¶
func (t *Table) SchemaIterator( ctx context.Context, tx uint64, pool memory.Allocator, callbacks []logicalplan.Callback, options ...logicalplan.Option, ) error
SchemaIterator iterates in order over all granules in the table and returns all the schemas seen across the table.
type TableBlock ¶
type TableBlock struct {
// contains filtered or unexported fields
}
func (*TableBlock) EnsureCompaction ¶
func (t *TableBlock) EnsureCompaction() error
EnsureCompaction forces a TableBlock compaction.
func (*TableBlock) Index ¶
func (t *TableBlock) Index() *index.LSM
Index provides atomic access to the table index.
func (*TableBlock) InsertRecord ¶
func (*TableBlock) Persist ¶
func (t *TableBlock) Persist() error
Persist uploads the block to the underlying bucket.
func (*TableBlock) Serialize ¶
func (t *TableBlock) Serialize(writer io.Writer) error
Serialize the table block into a single Parquet file.
func (*TableBlock) Size ¶
func (t *TableBlock) Size() int64
Size returns the cumulative size of all buffers in the table. This is roughly the size of the table in bytes.
type TableOption ¶
type TableOption func(*tablepb.TableConfig) error
func FromConfig ¶
func FromConfig(config *tablepb.TableConfig) TableOption
FromConfig sets the table configuration from the given config. NOTE: that this does not override the schema even though that is included in the passed in config.
func WithBlockReaderLimit ¶
func WithBlockReaderLimit(n int) TableOption
WithBlockReaderLimit sets the limit of go routines that will be used to read persisted block files. A negative number indicates no limit.
func WithRowGroupSize ¶
func WithRowGroupSize(numRows int) TableOption
WithRowGroupSize sets the size in number of rows for each row group for parquet files. A <= 0 value indicates no limit.
func WithUniquePrimaryIndex ¶
func WithUniquePrimaryIndex(unique bool) TableOption
type TestingOption ¶
type TestingOption Option
func WithTestingNoDiskSpaceReclaimOnSnapshot ¶
func WithTestingNoDiskSpaceReclaimOnSnapshot() TestingOption
func WithTestingWalOptions ¶
func WithTestingWalOptions(opts ...wal.Option) TestingOption
type TxPool ¶
type TxPool struct {
// contains filtered or unexported fields
}
func NewTxPool ¶
NewTxPool returns a new TxPool and starts the pool cleaner routine. The transaction pool is used to keep track of completed transactions. It does this by inserting completed transactions into an ordered linked list.
Ex: insert: 12 [9]->[10]->[13] => [9]->[10]->[12]->[13]
Inserting a new node triggers the pool cleaner routine to run. The pool cleaner's job is to increment a high-watermark counter when it encounters contiguous transactions in the list, and then remove those elements in the pool.
Ex: watermark: 7 insert: 8 [9]->[10]->[13] => [8]->[9]->[10]->[13] (cleaner notified)
[8]->[9]->[10]->[13]
^ watermark++; delete 8
[9]->[10]->[13]
^ watermark++; delete 9
[10]->[13]
^ watermark++; delete 9
[13] watermark: 10
TxPool is a sorted lockless linked-list described in https://timharris.uk/papers/2001-disc.pdf
type WAL ¶
type WAL interface { Close() error Log(tx uint64, record *walpb.Record) error LogRecord(tx uint64, table string, record arrow.Record) error // Replay replays WAL records from the given first index. If firstIndex is // 0, the first index read from the WAL is used (i.e. given a truncation, // using 0 is still valid). If the given firstIndex is less than the WAL's // first index on disk, the replay happens from the first index on disk. // If the handler panics, the WAL implementation will truncate the WAL up to // the last valid index. Replay(tx uint64, handler wal.ReplayHandlerFunc) error Truncate(tx uint64) error Reset(nextTx uint64) error FirstIndex() (uint64, error) LastIndex() (uint64, error) }
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
cmd
|
|
parquet-tool
Module
|
|
examples
|
|
gen
|
|
internal
|
|
builder
Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.
|
Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. |