index

package
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 23, 2024 License: Apache-2.0 Imports: 23 Imported by: 0

Documentation

Index

Constants

View Source
const (
	IndexFileExtension     = ".idx"
	ParquetCompactionTXKey = "compaction_tx"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Compaction

type Compaction func(w io.Writer, compact []parts.Part, options ...parquet.WriterOption) (int64, error)

type CompactionType

type CompactionType int
const (
	CompactionTypeUnknown CompactionType = iota

	// CompactionTypeParquetDisk is a compaction type that will compact the parts into a Parquet file on disk.
	CompactionTypeParquetDisk

	// CompactionTypeParquetMemory is a compaction type that will compact the parts into a Parquet file in memory.
	CompactionTypeParquetMemory
)

type FileCompaction

type FileCompaction struct {
	// contains filtered or unexported fields
}

func NewFileCompaction

func NewFileCompaction(dir string, maxSize int64, compact Compaction, logger log.Logger) (*FileCompaction, error)

func (*FileCompaction) Compact

func (f *FileCompaction) Compact(compact []parts.Part, options ...parts.Option) ([]parts.Part, int64, int64, error)

Compact will compact the given parts into a Parquet file written to the next level file.

func (*FileCompaction) MaxSize

func (f *FileCompaction) MaxSize() int64

func (*FileCompaction) Reset

func (f *FileCompaction) Reset()

Reset is called when the level no longer has active parts in it at the end of a compaction.

func (*FileCompaction) Snapshot

func (f *FileCompaction) Snapshot(_ []parts.Part, _ func(parts.Part) error, dir string) error

Snapshot takes a snapshot of the current level. It ignores the parts and just hard links the files into the snapshot directory. It will rotate the active file if it has data in it rendering all snapshotted files as immutable.

func (*FileCompaction) Sync

func (f *FileCompaction) Sync() error

Sync calls Sync on the underlying file.

type LSM

type LSM struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

LSM is a log-structured merge-tree like index. It is implemented as a single linked list of parts.

Arrow records are always added to the L0 list. When a list reaches it's configured max size it is compacted calling the levels Compact function and is then added as a new part to the next level.

L0->[record]->[record]->[L1]->[record/parquet]->[record/parquet] etc.

func NewLSM

func NewLSM(dir string, schema *dynparquet.Schema, levels []*LevelConfig, wait func(uint64), options ...LSMOption) (*LSM, error)

NewLSM returns an LSM-like index of len(levels) levels. wait is a function that will block until the given transaction has been committed; this is used only during compaction to ensure that all the tx in the level up to the compaction tx have been committed before compacting.

func (*LSM) Add

func (l *LSM) Add(tx uint64, record arrow.Record)

func (*LSM) Close

func (l *LSM) Close() error

func (*LSM) EnsureCompaction

func (l *LSM) EnsureCompaction() error

EnsureCompaction forces a compaction of all levels, regardless of whether the levels are below the target size.

func (*LSM) InsertPart

func (l *LSM) InsertPart(part parts.Part)

InsertPart inserts a part into the LSM tree. It will be inserted into the correct level. It does not check if the insert should cause a compaction. This should only be used during snapshot recovery. It will drop the insert on the floor if the part is older than a part in the next level of the LSM. This indicates that this part is already accounted for in the next level vis compaction.

func (*LSM) Iterate

func (l *LSM) Iterate(iter func(node *Node) bool)

func (*LSM) LevelSize

func (l *LSM) LevelSize(t SentinelType) int64

LevelSize returns the size of a specific level in bytes.

func (*LSM) MaxLevel

func (l *LSM) MaxLevel() SentinelType

func (*LSM) Prefixes

func (l *LSM) Prefixes(_ context.Context, _ string) ([]string, error)

func (*LSM) Rotate

func (l *LSM) Rotate(externalWriter func([]parts.Part) (parts.Part, int64, int64, error)) error

Rotate will write all parts in the LSM into the external writer. No changes are made to the LSM.

func (*LSM) Scan

func (l *LSM) Scan(ctx context.Context, _ string, _ *dynparquet.Schema, filter logicalplan.Expr, tx uint64, callback func(context.Context, any) error) error

func (*LSM) Size

func (l *LSM) Size() int64

Size returns the total size of the index in bytes.

func (*LSM) Snapshot

func (l *LSM) Snapshot(writer func(parts.Part) error, dir string) error

Snapshot creates a snapshot of the index at the given transaction. It will call the writer function with the parts in the index that are in-memory.

func (*LSM) String

func (l *LSM) String() string

func (*LSM) WaitForPendingCompactions

func (l *LSM) WaitForPendingCompactions()

type LSMMetrics

type LSMMetrics struct {
	Compactions        *prometheus.CounterVec
	LevelSize          *prometheus.GaugeVec
	CompactionDuration prometheus.Histogram
}

LSMMetrics are the metrics for an LSM index.

func NewLSMMetrics

func NewLSMMetrics(reg prometheus.Registerer) *LSMMetrics

type LSMOption

type LSMOption func(*LSM)

func LSMWithLogger

func LSMWithLogger(logger log.Logger) LSMOption

func LSMWithMetrics

func LSMWithMetrics(metrics *LSMMetrics) LSMOption

type Level

type Level interface {
	Compact(parts []parts.Part, options ...parts.Option) ([]parts.Part, int64, int64, error)
	Snapshot(parts []parts.Part, writer func(parts.Part) error, dir string) error
	MaxSize() int64
	Reset()
}

type LevelConfig

type LevelConfig struct {
	Level   SentinelType
	MaxSize int64
	Type    CompactionType
	Compact Compaction
}

LevelConfig is the configuration for a level in the LSM. The MaxSize is the maximum size of the level in bytes before it triggers a compaction into the next level.

type Node

type Node struct {
	// contains filtered or unexported fields
}

Node is a Part that is a part of a linked-list.

func NewList

func NewList(sentinel SentinelType) *Node

NewList creates a new part list using atomic constructs.

func (*Node) Insert

func (n *Node) Insert(part parts.Part)

Insert a Node into the list, in order by Tx.

func (*Node) Iterate

func (n *Node) Iterate(iterate func(*Node) bool)

Iterate accesses every node in the list.

func (*Node) Part

func (n *Node) Part() parts.Part

func (*Node) Prepend

func (n *Node) Prepend(part parts.Part) *Node

Prepend a node onto the front of the list.

func (*Node) Sentinel

func (n *Node) Sentinel(s SentinelType) *Node

Sentinel adds a new sentinel node to the list, and returns the sub list starting from that sentinel.

func (*Node) String

func (n *Node) String() string

type ReleaseableRowGroup

type ReleaseableRowGroup interface {
	dynparquet.DynamicRowGroup
	Release()
}

type SentinelType

type SentinelType int
const (
	L0 SentinelType = iota
	L1
	L2
)

func (SentinelType) String

func (s SentinelType) String() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL