metadata

package module
v0.0.0-...-08be509 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 26, 2026 License: Apache-2.0 Imports: 27 Imported by: 0

README

GigAPI Metadata Engine

Metadata Redis CI

Gigapi Metadata provides a high-performance indexing system for managing metadata about data files (typically Parquet files) organized in time-partitioned structures. It supports efficient querying, merging operations, and provides both local JSON file storage and distributed Redis storage backends.

Features

  • Dual Storage Backends: JSON file-based storage for local deployments and Redis for distributed systems
  • Time-Partitioned Data: Optimized for date/hour partitioned data structures 1
  • Merge Planning: Intelligent merge planning for data consolidation across different layers 2
  • Async Operations: Promise-based asynchronous operations for better performance 3
  • Efficient Querying: Time-range and folder-based querying capabilities 4

Installation

go get github.com/gigapi/metadata

Core Concepts

IndexEntry

The fundamental data structure representing metadata about a single data file: 5

Storage Backends
JSON Index

For local file-based storage, suitable for single-node deployments: 6

Redis Index

For distributed deployments with Redis backend: 7

Configuration

Merge Configurations

Before using the library, initialize merge configurations which define merge behavior across different iterations: 8

Example configuration:

import "github.com/gigapi/metadata"

// Configure merge settings: [timeout_sec, max_size_bytes, iteration_id]
metadata.MergeConfigurations = [][3]int64{
    {10, 10 * 1024 * 1024, 1}, // 10s timeout, 10MB max size, iteration 1
    {30, 50 * 1024 * 1024, 2}, // 30s timeout, 50MB max size, iteration 2
}

Usage Examples

Basic JSON Index Usage
// Create a JSON-based table index
tableIndex := metadata.NewJSONIndex("/data/root", "my_database", "my_table")

// Add metadata entries
entries := []*metadata.IndexEntry{
    {
        Database:  "my_database",
        Table:     "my_table", 
        Path:      "date=2024-01-15/hour=14/file1.parquet",
        SizeBytes: 1000000,
        MinTime:   1705327200000000000, // nanoseconds
        MaxTime:   1705327800000000000,
    },
}

// Batch operation (async)
promise := tableIndex.Batch(entries, nil)
result, err := promise.Get()
Redis Index Usage 9
Querying Data
// Query with time range
options := metadata.QueryOptions{
    After:  time.Now().Add(-24 * time.Hour),
    Before: time.Now(),
}

entries, err := tableIndex.GetQuerier().Query(options)
Merge Operations
// Get merge plan
planner := tableIndex.GetMergePlanner()
plan, err := planner.GetMergePlan("layer1", 1)

if plan != nil {
    // Execute merge (external process)
    // ...
    
    // Mark merge as complete
    err = planner.EndMerge(plan)
}

Interfaces

TableIndex Interface

The main interface for table-level operations: 10

DBIndex Interface

For database-level operations: 11

Data Organization

The system expects data organized in the following structure:

/root/
  ├── database1/
  │   ├── table1/
  │   │   ├── date=2024-01-15/
  │   │   │   ├── hour=00/
  │   │   │   ├── hour=01/
  │   │   │   └── ...
  │   │   └── date=2024-01-16/
  │   └── table2/
  └── database2/

Redis Configuration

For Redis backend, use standard Redis connection URLs:

  • redis://localhost:6379/0 - Standard Redis
  • rediss://user:pass@host:6380/1 - Redis with TLS 12

Error Handling

All operations return errors through the Promise interface or standard Go error handling. The library uses async operations for better performance in high-throughput scenarios.

Thread Safety

Both JSON and Redis implementations are thread-safe and can be used concurrently across multiple goroutines.

Testing

Run tests with a local Redis instance:

# Start Redis
docker run -d -p 6379:6379 redis:alpine

# Run tests  
go test ./...

License

This project is licensed under the Apache License 2.0. 13

Documentation

Ask DeepWiki

Contributing

  1. Fork the repository
  2. Create a feature branch
  3. Add tests for new functionality
  4. Ensure all tests pass
  5. Submit a pull request

Notes

  • The library is optimized for time-series data workloads with frequent writes and time-range queries
  • Redis backend is recommended for distributed deployments and high-throughput scenarios
  • JSON backend is suitable for single-node deployments and development environments
  • Merge operations are designed to be executed by external processes, with the library managing the planning and coordination
  • All time values are stored as Unix nanoseconds for high precision temporal operations

Documentation

Index

Constants

This section is empty.

Variables

View Source
var END_MERGE_SCRIPT []byte
View Source
var GET_MERGE_PLAN_SCRIPT []byte
View Source
var MergeConfigurations []MergeConfigurationsConf
View Source
var SCRIPT_PATCH_INDEX []byte

Functions

This section is empty.

Types

type DBIndex

type DBIndex interface {
	Databases() ([]string, error)
	Tables(database string) ([]string, error)
	Paths(database string, table string) ([]string, error)
}

func NewJSONDBIndex

func NewJSONDBIndex(layers []Layer) DBIndex

func NewRedisDbIndex

func NewRedisDbIndex(URL string) (DBIndex, error)

func NewS3DBIndex

func NewS3DBIndex(layers []Layer) (DBIndex, error)

type DropPlan

type DropPlan struct {
	ID       string
	WriterID string
	Layer    string
	Database string
	Table    string
	Path     string
	TimeS    int32
}

func (DropPlan) Id

func (d DropPlan) Id() string

type HybridIndex

type HybridIndex struct {
	// contains filtered or unexported fields
}

func (*HybridIndex) Batch

func (h *HybridIndex) Batch(add []*IndexEntry, rm []*IndexEntry) Promise[int32]

func (*HybridIndex) EndMerge

func (h *HybridIndex) EndMerge(plan MergePlan) Promise[int32]

func (*HybridIndex) EndMove

func (h *HybridIndex) EndMove(plan MovePlan) Promise[int32]

func (*HybridIndex) Get

func (h *HybridIndex) Get(layer string, _path string) *IndexEntry

func (*HybridIndex) GetAll

func (h *HybridIndex) GetAll() ([]*IndexEntry, error)

func (*HybridIndex) GetDropPlanner

func (h *HybridIndex) GetDropPlanner() TableDropPlanner

func (*HybridIndex) GetDropQueue

func (h *HybridIndex) GetDropQueue(writerId string, layer string) (DropPlan, error)

func (*HybridIndex) GetMergePlan

func (h *HybridIndex) GetMergePlan(writerId string, layer string, iteration int) (MergePlan, error)

func (*HybridIndex) GetMergePlanner

func (h *HybridIndex) GetMergePlanner() TableMergePlanner

func (*HybridIndex) GetMovePlan

func (h *HybridIndex) GetMovePlan(writerId string, layer string) (MovePlan, error)

func (*HybridIndex) GetMovePlanner

func (h *HybridIndex) GetMovePlanner() TableMovePlanner

func (*HybridIndex) GetQuerier

func (h *HybridIndex) GetQuerier() TableQuerier

func (*HybridIndex) Query

func (h *HybridIndex) Query(options QueryOptions) ([]*IndexEntry, error)

func (*HybridIndex) RmFromDropQueue

func (h *HybridIndex) RmFromDropQueue(plan DropPlan) Promise[int32]

func (*HybridIndex) Run

func (h *HybridIndex) Run()

func (*HybridIndex) Stop

func (h *HybridIndex) Stop()

type Identified

type Identified interface {
	Id() string
}

type IndexEntry

type IndexEntry struct {
	Layer     string         `json:"layer"`
	Database  string         `json:"database"`
	Table     string         `json:"table"`
	Path      string         `json:"path"`
	SizeBytes int64          `json:"size_bytes"`
	RowCount  int64          `json:"row_count"`
	ChunkTime int64          `json:"chunk_time"`
	Min       map[string]any `json:"min"`
	Max       map[string]any `json:"max"`
	MinTime   int64          `json:"min_time"`
	MaxTime   int64          `json:"max_time"`
	WriterID  string         `json:"writer_id"`
}

type JSONIndex

type JSONIndex struct {
	// contains filtered or unexported fields
}

func (*JSONIndex) Batch

func (J *JSONIndex) Batch(add []*IndexEntry, rm []*IndexEntry) Promise[int32]

func (*JSONIndex) EndMerge

func (J *JSONIndex) EndMerge(plan MergePlan) Promise[int32]

func (*JSONIndex) EndMove

func (J *JSONIndex) EndMove(plan MovePlan) Promise[int32]

func (*JSONIndex) Get

func (J *JSONIndex) Get(layer string, _path string) *IndexEntry

func (*JSONIndex) GetAll

func (J *JSONIndex) GetAll() ([]*IndexEntry, error)

func (*JSONIndex) GetDropPlanner

func (J *JSONIndex) GetDropPlanner() TableDropPlanner

func (*JSONIndex) GetDropQueue

func (J *JSONIndex) GetDropQueue(writerId string, layer string) (DropPlan, error)

func (*JSONIndex) GetMergePlan

func (J *JSONIndex) GetMergePlan(writerId string, layer string, iteration int) (MergePlan, error)

func (*JSONIndex) GetMergePlanner

func (J *JSONIndex) GetMergePlanner() TableMergePlanner

func (*JSONIndex) GetMovePlan

func (J *JSONIndex) GetMovePlan(writerId string, layer string) (MovePlan, error)

func (*JSONIndex) GetMovePlanner

func (J *JSONIndex) GetMovePlanner() TableMovePlanner

func (*JSONIndex) GetQuerier

func (J *JSONIndex) GetQuerier() TableQuerier

func (*JSONIndex) Query

func (J *JSONIndex) Query(options QueryOptions) ([]*IndexEntry, error)

func (*JSONIndex) RmFromDropQueue

func (J *JSONIndex) RmFromDropQueue(plan DropPlan) Promise[int32]

func (*JSONIndex) Run

func (J *JSONIndex) Run()

func (*JSONIndex) Stop

func (J *JSONIndex) Stop()

type KVStoreIndex

type KVStoreIndex interface {
	Get(key string) ([]byte, error)
	Put(key string, value []byte) error
	Delete(key string) error
	Destroy()
}

func NewJSONKVStoreIndex

func NewJSONKVStoreIndex(path string) (KVStoreIndex, error)

func NewRedisKVStore

func NewRedisKVStore(URL string) (KVStoreIndex, error)

func NewS3KVStoreIndex

func NewS3KVStoreIndex(s3URL string) (KVStoreIndex, error)

type Layer

type Layer struct {
	URL    string `json:"url"`
	Name   string `json:"name"`
	Type   string `json:"type"`
	TTLSec int32  `json:"ttl_sec"`
	Key    string `json:"key,omitempty"`
	Secret string `json:"secret,omitempty"`
}

type MergeConfigurationsConf

type MergeConfigurationsConf [3]int64

MergeConfiguration is array of arrays of: [[timeout_sec, max_size, merge_iteration_id], ...] You have to init MergeConfigurations in the very beginning

func (MergeConfigurationsConf) MaxSize

func (m MergeConfigurationsConf) MaxSize() int64

func (MergeConfigurationsConf) MergeIterationId

func (m MergeConfigurationsConf) MergeIterationId() int64

func (MergeConfigurationsConf) TimeoutSec

func (m MergeConfigurationsConf) TimeoutSec() int64

type MergePlan

type MergePlan struct {
	ID        string
	WriterID  string
	Layer     string
	Database  string
	Table     string
	From      []string
	To        string
	Iteration int
}

func (MergePlan) Id

func (m MergePlan) Id() string

type MovePlan

type MovePlan struct {
	ID        string `json:"id"`
	WriterID  string `json:"writer_id"`
	Database  string `json:"database"`
	Table     string `json:"table"`
	PathFrom  string `json:"path_from"`
	LayerFrom string `json:"layer_from"`
	PathTo    string `json:"path_to"`
	LayerTo   string `json:"layer_to"`
}

func (MovePlan) Id

func (m MovePlan) Id() string

type Promise

type Promise[T any] interface {
	Get() (T, error)
	Peek() (int32, T, error)
	Done(res T, err error)
}

func Fulfilled

func Fulfilled[T any](err error, res T) Promise[T]

func NewPromise

func NewPromise[T any]() Promise[T]

func NewWaitForAll

func NewWaitForAll[T any](promises []Promise[T]) Promise[T]

type QueryOptions

type QueryOptions struct {
	Folder    string
	After     time.Time
	Before    time.Time
	Iteration int
}

type RedisIndex

type RedisIndex struct {
	// contains filtered or unexported fields
}

func (*RedisIndex) Batch

func (r *RedisIndex) Batch(add []*IndexEntry, rm []*IndexEntry) Promise[int32]

func (*RedisIndex) EndMerge

func (r *RedisIndex) EndMerge(plan MergePlan) Promise[int32]

func (*RedisIndex) EndMove

func (r *RedisIndex) EndMove(plan MovePlan) Promise[int32]

func (*RedisIndex) Get

func (r *RedisIndex) Get(layer string, path string) *IndexEntry

func (*RedisIndex) GetAll

func (r *RedisIndex) GetAll() ([]*IndexEntry, error)

func (*RedisIndex) GetDropPlanner

func (r *RedisIndex) GetDropPlanner() TableDropPlanner

func (*RedisIndex) GetDropQueue

func (r *RedisIndex) GetDropQueue(writerId string, layer string) (DropPlan, error)

func (*RedisIndex) GetMergePlan

func (r *RedisIndex) GetMergePlan(writerId string, layer string, iteration int) (MergePlan, error)

func (*RedisIndex) GetMergePlanner

func (r *RedisIndex) GetMergePlanner() TableMergePlanner

func (*RedisIndex) GetMovePlan

func (r *RedisIndex) GetMovePlan(writerId string, layer string) (MovePlan, error)

func (*RedisIndex) GetMovePlanner

func (r *RedisIndex) GetMovePlanner() TableMovePlanner

func (*RedisIndex) GetQuerier

func (r *RedisIndex) GetQuerier() TableQuerier

func (*RedisIndex) Query

func (r *RedisIndex) Query(options QueryOptions) ([]*IndexEntry, error)

func (*RedisIndex) RmFromDropQueue

func (r *RedisIndex) RmFromDropQueue(plan DropPlan) Promise[int32]

func (*RedisIndex) Run

func (r *RedisIndex) Run()

func (*RedisIndex) Stop

func (r *RedisIndex) Stop()

type S3Index

type S3Index struct {
	// contains filtered or unexported fields
}

func (*S3Index) Batch

func (J *S3Index) Batch(add []*IndexEntry, rm []*IndexEntry) Promise[int32]

func (*S3Index) EndMerge

func (J *S3Index) EndMerge(plan MergePlan) Promise[int32]

func (*S3Index) EndMove

func (J *S3Index) EndMove(plan MovePlan) Promise[int32]

func (*S3Index) Get

func (J *S3Index) Get(layer string, _path string) *IndexEntry

func (*S3Index) GetAll

func (J *S3Index) GetAll() ([]*IndexEntry, error)

func (*S3Index) GetDropPlanner

func (J *S3Index) GetDropPlanner() TableDropPlanner

func (*S3Index) GetDropQueue

func (J *S3Index) GetDropQueue(writerId string, layer string) (DropPlan, error)

func (*S3Index) GetMergePlan

func (J *S3Index) GetMergePlan(writerId string, layer string, iteration int) (MergePlan, error)

func (*S3Index) GetMergePlanner

func (J *S3Index) GetMergePlanner() TableMergePlanner

func (*S3Index) GetMovePlan

func (J *S3Index) GetMovePlan(writerId string, layer string) (MovePlan, error)

func (*S3Index) GetMovePlanner

func (J *S3Index) GetMovePlanner() TableMovePlanner

func (*S3Index) GetQuerier

func (J *S3Index) GetQuerier() TableQuerier

func (*S3Index) Query

func (J *S3Index) Query(options QueryOptions) ([]*IndexEntry, error)

func (*S3Index) RmFromDropQueue

func (J *S3Index) RmFromDropQueue(plan DropPlan) Promise[int32]

func (*S3Index) Run

func (J *S3Index) Run()

func (*S3Index) Stop

func (J *S3Index) Stop()

type SinglePromise

type SinglePromise[T any] struct {
	// contains filtered or unexported fields
}

func (*SinglePromise[T]) Done

func (p *SinglePromise[T]) Done(res T, err error)

func (*SinglePromise[T]) Get

func (p *SinglePromise[T]) Get() (T, error)

func (*SinglePromise[T]) Peek

func (p *SinglePromise[T]) Peek() (int32, T, error)

type TableDropPlanner

type TableDropPlanner interface {
	GetDropQueue(writerId string, layer string) (DropPlan, error)
	RmFromDropQueue(plan DropPlan) Promise[int32]
}

type TableIndex

type TableIndex interface {
	Batch(add []*IndexEntry, rm []*IndexEntry) Promise[int32]
	Get(layer string, path string) *IndexEntry
	Run()
	Stop()
	GetMergePlanner() TableMergePlanner
	GetQuerier() TableQuerier
	GetMovePlanner() TableMovePlanner
	GetDropPlanner() TableDropPlanner
	GetAll() ([]*IndexEntry, error)
}

func NewIndex

func NewIndex(root string, database string, table string, layers []Layer) (TableIndex, error)

func NewJSONIndex

func NewJSONIndex(root string, database string, table string, layers []Layer) (TableIndex, error)

func NewRedisIndex

func NewRedisIndex(URL string, database string, table string, layers []Layer) (TableIndex, error)

func NewS3Index

func NewS3Index(database string, table string, layers []Layer) (TableIndex, error)

type TableMergePlanner

type TableMergePlanner interface {
	GetMergePlan(writerId string, layer string, iteration int) (MergePlan, error)
	EndMerge(plan MergePlan) Promise[int32]
}

type TableMovePlanner

type TableMovePlanner interface {
	GetMovePlan(writerId string, layer string) (MovePlan, error)
	EndMove(plan MovePlan) Promise[int32]
}

type TableQuerier

type TableQuerier interface {
	Query(options QueryOptions) ([]*IndexEntry, error)
}

type WaitForAllPromise

type WaitForAllPromise[T any] struct {
	// contains filtered or unexported fields
}

func (*WaitForAllPromise[T]) Add

func (p *WaitForAllPromise[T]) Add(promise Promise[T])

func (*WaitForAllPromise[T]) Done

func (p *WaitForAllPromise[T]) Done(res T, err error)

func (*WaitForAllPromise[T]) Get

func (p *WaitForAllPromise[T]) Get() (T, error)

func (*WaitForAllPromise[T]) Peek

func (p *WaitForAllPromise[T]) Peek() (int32, T, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL