metadata

package module
v0.0.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 10, 2025 License: Apache-2.0 Imports: 22 Imported by: 5

README

GigAPI Metadata Engine

Metadata Redis CI

Gigapi Metadata provides a high-performance indexing system for managing metadata about data files (typically Parquet files) organized in time-partitioned structures. It supports efficient querying, merging operations, and provides both local JSON file storage and distributed Redis storage backends.

Features

  • Dual Storage Backends: JSON file-based storage for local deployments and Redis for distributed systems
  • Time-Partitioned Data: Optimized for date/hour partitioned data structures 1
  • Merge Planning: Intelligent merge planning for data consolidation across different layers 2
  • Async Operations: Promise-based asynchronous operations for better performance 3
  • Efficient Querying: Time-range and folder-based querying capabilities 4

Installation

go get github.com/gigapi/metadata

Core Concepts

IndexEntry

The fundamental data structure representing metadata about a single data file: 5

Storage Backends
JSON Index

For local file-based storage, suitable for single-node deployments: 6

Redis Index

For distributed deployments with Redis backend: 7

Configuration

Merge Configurations

Before using the library, initialize merge configurations which define merge behavior across different iterations: 8

Example configuration:

import "github.com/gigapi/metadata"

// Configure merge settings: [timeout_sec, max_size_bytes, iteration_id]
metadata.MergeConfigurations = [][3]int64{
    {10, 10 * 1024 * 1024, 1}, // 10s timeout, 10MB max size, iteration 1
    {30, 50 * 1024 * 1024, 2}, // 30s timeout, 50MB max size, iteration 2
}

Usage Examples

Basic JSON Index Usage
// Create a JSON-based table index
tableIndex := metadata.NewJSONIndex("/data/root", "my_database", "my_table")

// Add metadata entries
entries := []*metadata.IndexEntry{
    {
        Database:  "my_database",
        Table:     "my_table", 
        Path:      "date=2024-01-15/hour=14/file1.parquet",
        SizeBytes: 1000000,
        MinTime:   1705327200000000000, // nanoseconds
        MaxTime:   1705327800000000000,
    },
}

// Batch operation (async)
promise := tableIndex.Batch(entries, nil)
result, err := promise.Get()
Redis Index Usage 9
Querying Data
// Query with time range
options := metadata.QueryOptions{
    After:  time.Now().Add(-24 * time.Hour),
    Before: time.Now(),
}

entries, err := tableIndex.GetQuerier().Query(options)
Merge Operations
// Get merge plan
planner := tableIndex.GetMergePlanner()
plan, err := planner.GetMergePlan("layer1", 1)

if plan != nil {
    // Execute merge (external process)
    // ...
    
    // Mark merge as complete
    err = planner.EndMerge(plan)
}

Interfaces

TableIndex Interface

The main interface for table-level operations: 10

DBIndex Interface

For database-level operations: 11

Data Organization

The system expects data organized in the following structure:

/root/
  ├── database1/
  │   ├── table1/
  │   │   ├── date=2024-01-15/
  │   │   │   ├── hour=00/
  │   │   │   ├── hour=01/
  │   │   │   └── ...
  │   │   └── date=2024-01-16/
  │   └── table2/
  └── database2/

Redis Configuration

For Redis backend, use standard Redis connection URLs:

  • redis://localhost:6379/0 - Standard Redis
  • rediss://user:pass@host:6380/1 - Redis with TLS 12

Error Handling

All operations return errors through the Promise interface or standard Go error handling. The library uses async operations for better performance in high-throughput scenarios.

Thread Safety

Both JSON and Redis implementations are thread-safe and can be used concurrently across multiple goroutines.

Testing

Run tests with a local Redis instance:

# Start Redis
docker run -d -p 6379:6379 redis:alpine

# Run tests  
go test ./...

License

This project is licensed under the Apache License 2.0. 13

Documentation

Ask DeepWiki

Contributing

  1. Fork the repository
  2. Create a feature branch
  3. Add tests for new functionality
  4. Ensure all tests pass
  5. Submit a pull request

Notes

  • The library is optimized for time-series data workloads with frequent writes and time-range queries
  • Redis backend is recommended for distributed deployments and high-throughput scenarios
  • JSON backend is suitable for single-node deployments and development environments
  • Merge operations are designed to be executed by external processes, with the library managing the planning and coordination
  • All time values are stored as Unix nanoseconds for high precision temporal operations

Documentation

Index

Constants

This section is empty.

Variables

View Source
var END_MERGE_SCRIPT []byte
View Source
var GET_MERGE_PLAN_SCRIPT []byte
View Source
var MergeConfigurations []MergeConfigurationsConf
View Source
var SCRIPT_PATCH_INDEX []byte

Functions

This section is empty.

Types

type DBIndex

type DBIndex interface {
	Databases() ([]string, error)
	Tables(database string) ([]string, error)
	Paths(database string, table string) ([]string, error)
}

func NewJSONDBIndex added in v0.0.4

func NewJSONDBIndex(layers []Layer) DBIndex

func NewRedisDbIndex

func NewRedisDbIndex(URL string) (DBIndex, error)

type DropPlan added in v0.0.4

type DropPlan struct {
	ID       string
	WriterID string
	Layer    string
	Database string
	Table    string
	Path     string
	TimeS    int32
}

func (DropPlan) Id added in v0.0.4

func (d DropPlan) Id() string

type Identified added in v0.0.4

type Identified interface {
	Id() string
}

type IndexEntry

type IndexEntry struct {
	Layer     string         `json:"layer"`
	Database  string         `json:"database"`
	Table     string         `json:"table"`
	Path      string         `json:"path"`
	SizeBytes int64          `json:"size_bytes"`
	RowCount  int64          `json:"row_count"`
	ChunkTime int64          `json:"chunk_time"`
	Min       map[string]any `json:"min"`
	Max       map[string]any `json:"max"`
	MinTime   int64          `json:"min_time"`
	MaxTime   int64          `json:"max_time"`
	WriterID  string         `json:"writer_id"`
}

type JSONIndex

type JSONIndex struct {
	// contains filtered or unexported fields
}

func (*JSONIndex) Batch

func (J *JSONIndex) Batch(add []*IndexEntry, rm []*IndexEntry) Promise[int32]

func (*JSONIndex) EndMerge

func (J *JSONIndex) EndMerge(plan MergePlan) Promise[int32]

func (*JSONIndex) EndMove added in v0.0.4

func (J *JSONIndex) EndMove(plan MovePlan) Promise[int32]

func (*JSONIndex) Get

func (J *JSONIndex) Get(layer string, _path string) *IndexEntry

func (*JSONIndex) GetDropPlanner added in v0.0.4

func (J *JSONIndex) GetDropPlanner() TableDropPlanner

func (*JSONIndex) GetDropQueue

func (J *JSONIndex) GetDropQueue(writerId string, layer string) (DropPlan, error)

func (*JSONIndex) GetMergePlan

func (J *JSONIndex) GetMergePlan(writerId string, layer string, iteration int) (MergePlan, error)

func (*JSONIndex) GetMergePlanner

func (J *JSONIndex) GetMergePlanner() TableMergePlanner

func (*JSONIndex) GetMovePlan added in v0.0.4

func (J *JSONIndex) GetMovePlan(writerId string, layer string) (MovePlan, error)

func (*JSONIndex) GetMovePlanner added in v0.0.4

func (J *JSONIndex) GetMovePlanner() TableMovePlanner

func (*JSONIndex) GetQuerier

func (J *JSONIndex) GetQuerier() TableQuerier

func (*JSONIndex) Query

func (J *JSONIndex) Query(options QueryOptions) ([]*IndexEntry, error)

func (*JSONIndex) RmFromDropQueue

func (J *JSONIndex) RmFromDropQueue(plan DropPlan) Promise[int32]

func (*JSONIndex) Run

func (J *JSONIndex) Run()

func (*JSONIndex) Stop

func (J *JSONIndex) Stop()

type Layer added in v0.0.4

type Layer struct {
	URL    string `json:"url"`
	Name   string `json:"name"`
	Type   string `json:"type"`
	TTLSec int32  `json:"ttl_sec"`
}

type MergeConfigurationsConf

type MergeConfigurationsConf [3]int64

MergeConfiguration is array of arrays of: [[timeout_sec, max_size, merge_iteration_id], ...] You have to init MergeConfigurations in the very beginning

func (MergeConfigurationsConf) MaxSize added in v0.0.4

func (m MergeConfigurationsConf) MaxSize() int64

func (MergeConfigurationsConf) MergeIterationId added in v0.0.4

func (m MergeConfigurationsConf) MergeIterationId() int64

func (MergeConfigurationsConf) TimeoutSec added in v0.0.4

func (m MergeConfigurationsConf) TimeoutSec() int64

type MergePlan

type MergePlan struct {
	ID        string
	WriterID  string
	Layer     string
	Database  string
	Table     string
	From      []string
	To        string
	Iteration int
}

func (MergePlan) Id added in v0.0.4

func (m MergePlan) Id() string

type MovePlan added in v0.0.4

type MovePlan struct {
	ID        string `json:"id"`
	WriterID  string `json:"writer_id"`
	Database  string `json:"database"`
	Table     string `json:"table"`
	PathFrom  string `json:"path_from"`
	LayerFrom string `json:"layer_from"`
	PathTo    string `json:"path_to"`
	LayerTo   string `json:"layer_to"`
}

func (MovePlan) Id added in v0.0.4

func (m MovePlan) Id() string

type Promise

type Promise[T any] interface {
	Get() (T, error)
	Peek() (int32, T, error)
	Done(res T, err error)
}

func Fulfilled

func Fulfilled[T any](err error, res T) Promise[T]

func NewPromise

func NewPromise[T any]() Promise[T]

func NewWaitForAll

func NewWaitForAll[T any](promises []Promise[T]) Promise[T]

type QueryOptions

type QueryOptions struct {
	Folder    string
	After     time.Time
	Before    time.Time
	Iteration int
}

type RedisIndex

type RedisIndex struct {
	// contains filtered or unexported fields
}

func (*RedisIndex) Batch

func (r *RedisIndex) Batch(add []*IndexEntry, rm []*IndexEntry) Promise[int32]

func (*RedisIndex) EndMerge

func (r *RedisIndex) EndMerge(plan MergePlan) Promise[int32]

func (*RedisIndex) EndMove added in v0.0.4

func (r *RedisIndex) EndMove(plan MovePlan) Promise[int32]

func (*RedisIndex) Get

func (r *RedisIndex) Get(layer string, path string) *IndexEntry

func (*RedisIndex) GetDropPlanner added in v0.0.4

func (r *RedisIndex) GetDropPlanner() TableDropPlanner

func (*RedisIndex) GetDropQueue

func (r *RedisIndex) GetDropQueue(writerId string, layer string) (DropPlan, error)

func (*RedisIndex) GetMergePlan

func (r *RedisIndex) GetMergePlan(writerId string, layer string, iteration int) (MergePlan, error)

func (*RedisIndex) GetMergePlanner

func (r *RedisIndex) GetMergePlanner() TableMergePlanner

func (*RedisIndex) GetMovePlan added in v0.0.4

func (r *RedisIndex) GetMovePlan(writerId string, layer string) (MovePlan, error)

func (*RedisIndex) GetMovePlanner added in v0.0.4

func (r *RedisIndex) GetMovePlanner() TableMovePlanner

func (*RedisIndex) GetQuerier

func (r *RedisIndex) GetQuerier() TableQuerier

func (*RedisIndex) Query

func (r *RedisIndex) Query(options QueryOptions) ([]*IndexEntry, error)

func (*RedisIndex) RmFromDropQueue

func (r *RedisIndex) RmFromDropQueue(plan DropPlan) Promise[int32]

func (*RedisIndex) Run

func (r *RedisIndex) Run()

func (*RedisIndex) Stop

func (r *RedisIndex) Stop()

type SinglePromise

type SinglePromise[T any] struct {
	// contains filtered or unexported fields
}

func (*SinglePromise[T]) Done

func (p *SinglePromise[T]) Done(res T, err error)

func (*SinglePromise[T]) Get

func (p *SinglePromise[T]) Get() (T, error)

func (*SinglePromise[T]) Peek

func (p *SinglePromise[T]) Peek() (int32, T, error)

type TableDropPlanner added in v0.0.4

type TableDropPlanner interface {
	GetDropQueue(writerId string, layer string) (DropPlan, error)
	RmFromDropQueue(plan DropPlan) Promise[int32]
}

type TableIndex

type TableIndex interface {
	Batch(add []*IndexEntry, rm []*IndexEntry) Promise[int32]
	Get(layer string, path string) *IndexEntry
	Run()
	Stop()
	GetMergePlanner() TableMergePlanner
	GetQuerier() TableQuerier
	GetMovePlanner() TableMovePlanner
	GetDropPlanner() TableDropPlanner
}

func NewJSONIndex

func NewJSONIndex(root string, database string, table string, layers []Layer) (TableIndex, error)

func NewRedisIndex

func NewRedisIndex(URL string, database string, table string, layers []Layer) (TableIndex, error)

type TableMergePlanner added in v0.0.2

type TableMergePlanner interface {
	GetMergePlan(writerId string, layer string, iteration int) (MergePlan, error)
	EndMerge(plan MergePlan) Promise[int32]
}

type TableMovePlanner added in v0.0.4

type TableMovePlanner interface {
	GetMovePlan(writerId string, layer string) (MovePlan, error)
	EndMove(plan MovePlan) Promise[int32]
}

type TableQuerier added in v0.0.2

type TableQuerier interface {
	Query(options QueryOptions) ([]*IndexEntry, error)
}

type WaitForAllPromise

type WaitForAllPromise[T any] struct {
	// contains filtered or unexported fields
}

func (*WaitForAllPromise[T]) Add

func (p *WaitForAllPromise[T]) Add(promise Promise[T])

func (*WaitForAllPromise[T]) Done

func (p *WaitForAllPromise[T]) Done(res T, err error)

func (*WaitForAllPromise[T]) Get

func (p *WaitForAllPromise[T]) Get() (T, error)

func (*WaitForAllPromise[T]) Peek

func (p *WaitForAllPromise[T]) Peek() (int32, T, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL