db

package
v0.0.0-...-86e9f11 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 7, 2024 License: Apache-2.0 Imports: 32 Imported by: 0

Documentation

Overview

Package db implements the policy layout of databases, tables, and indices as a virtual filesystem tree.

Index

Constants

View Source
const (
	// DefaultMinimumAge is the default minimum
	// age of packed-* files to be deleted.
	DefaultMinimumAge = 15 * time.Minute
	// DefaultInputMinimumAge is the default
	// minimum age of inputs-* files to be deleted.
	DefaultInputMinimumAge = 30 * time.Second
)
View Source
const (
	// StatusOK indicates that a QueueItem
	// was processed completely.
	StatusOK = iota
	// StatusTryAgain indicates that a QueueItem
	// was not processed, and it should be re-tried
	// shortly.
	StatusTryAgain
	// StatusWriteError indicates taht a QueueItem
	// was not processed, and it should be re-tried
	// after a delay.
	StatusWriteError
)
View Source
const DefaultAlgo = "zion+iguana_v0/specialized"

DefaultAlgo is the default compression algorithm for compressing data blocks.

View Source
const (
	// DefaultBatchSize is the default queue
	// batching size that is used when none is set.
	DefaultBatchSize = 100 * mega
)
View Source
const DefaultMaxInlineBytes = 100 * giga

DefaultMaxInlineBytes is the default number of decompressed bytes that we reference in blockfmt.Index.Inline before flushing references to blockfmt.Index.Indirect.

View Source
const DefaultMinMerge = 50 * 1024 * 1024

DefaultMinMerge is the default minimum merge size.

Rationale: it looks like S3 server-side copy will run at about 250MB/s, so 50MB done as a synchronous server-side copy will introduce 200ms of ingest latency over and above the baseline. That seems like a reasonable maximum overhead. (Keep in mind this is 50MB compressed, so potentially a few hundred MB decompressed.)

View Source
const DefaultRangeMultiple = 100

DefaultRangeMultiple is the default multiple of the chunk alignment at which we write out metadata.

View Source
const DefaultTargetMergeSize = 1 * giga

DefaultTargetMergeSize is the default target size for compacted packfiles.

View Source
const MaxIndexSize = 15 * 1024 * 1024

MaxIndexSize is the maximum size of an index object. (The purpose of an index size cap is to prevent us from reading arbitrarily-sized index objects before we have authenticated the objects.)

Variables

View Source
var ErrBuildAgain = errors.New("partial db update")

ErrBuildAgain is returned by db.Config.Sync when only some of the input objects were successfully ingested.

Functions

func DefinitionPath

func DefinitionPath(db, table string) string

DefinitionPath returns the path at which the definition file for the given db and table would live relative to the root of the FS.

func IndexPath

func IndexPath(db, table string) string

IndexPath returns the path at which the index for the given db and table would live relative to the root of the FS.

func List

func List(s fs.FS) ([]string, error)

List returns the list of databases within a shared filesystem.

func ListComponent

func ListComponent(s fs.FS, pattern string, part int) ([]string, error)

ListComponent performs a glob match on s for the given pattern and then yields a deduplicated list of path components corresponding to the given 0-indexed part number.

For example, part 1 of "/foo/*/baz" would yield all of the components that matched "*".

func ListTables

func ListTables(s fs.FS, db string) ([]string, error)

ListTables list the names of all tables in the given database. The database name must not be empty.

A table in the returned list does not guarantee that the table exists. For example, it may have been deleted between the call to ListTables and the call to OpenIndex.

func OpenIndex

func OpenIndex(s fs.FS, db, table string, key *blockfmt.Key) (*blockfmt.Index, error)

OpenIndex opens an index for the specific table and database. The key must correspond to the key used to sign the index when it was first inserted into the index.

func OpenPartialIndex

func OpenPartialIndex(s fs.FS, db, table string, key *blockfmt.Key) (*blockfmt.Index, error)

OpenPartialIndex is equivalent to OpenIndex, but skips decoding Index.Inputs. The returned index is suitable for queries, but not for synchronizing tables.

func TablePrefix

func TablePrefix(db, table string) string

TablePrefix returns the prefix at which the table files live relative to the root of the FS.

func Tables

func Tables(s fs.FS, db string) ([]string, error)

Tables returns the list of tables within a database within a shared filesystem.

func WriteDefinition

func WriteDefinition(dst OutputFS, db, table string, s *Definition) error

WriteDefinition writes a definition to the given database.

Types

type ClientFS

type ClientFS struct {
	*blockfmt.DirFS
	// URL is the URL returned by DirFS.Start.
	URL string
}

ClientFS is a client for a DirFS. This is meant to be used for testing purposes only.

func DecodeClientFS

func DecodeClientFS(d ion.Datum) (*ClientFS, error)

DecodeClientFS produces a ClientFS from the datum encoded by DirFS.Encode.

func (*ClientFS) Encode

func (c *ClientFS) Encode(dst *ion.Buffer, st *ion.Symtab) error

func (*ClientFS) Open

func (c *ClientFS) Open(name string) (fs.File, error)

func (*ClientFS) OpenRange

func (c *ClientFS) OpenRange(name, etag string, off, width int64) (io.ReadCloser, error)

OpenRange implements fsutil.OpenRangeFS.OpenRange

type Config

type Config struct {
	// Algo is the compression algorithm
	// used for producing output data blocks.
	// (See [blockfmt.CompressorByName].)
	// If Algo is the empty string, Config
	// uses DefaultAlgo instead.
	Algo string
	// Align is the alignment of new
	// blocks to be produced in objects
	// inserted into the index.
	Align int
	// RangeMultiple is the multiple of Align
	// at which we write out metadata.
	// If RangeMultiple is zero, it defaults to
	// DefaultRangeMultiple
	RangeMultiple int
	// MinMergeSize is the base merge
	// size of objects. If MinMergeSize is zero,
	// then DefaultMinMerge is used.
	MinMergeSize int
	// TargetMergeSize is the target size of
	// files that are compacted into larger packfiles.
	// If TargetMergeSize is zero, then DefaultTargetMergeSize is used.
	TargetMergeSize int
	// MinInputBytesPerCPU, if non-zero, determines the minimum
	// number of input bytes necessary to cause the conversion
	// process to decide to use an additional CPU core.
	// For example, if MinInputBytesPerCPU is 512kB, then 3MB of input
	// data would use 6 CPU cores (provided GOMAXPROCS is at least this high).
	// See blockfmt.MinInputBytesPerCPU
	MinInputBytesPerCPU int64
	// Force forces a full index rebuild
	// even when the input appears to be up-to-date.
	Force bool
	// Fallback determines the format for
	// objects when the object format is not
	// obvious from the file extension.
	Fallback func(name string) blockfmt.RowFormat
	// MaxScanObjects is the maximum number
	// of objects to be committed in a single Scan operation.
	// If MaxScanObjects is less than or equal to zero,
	// it is ignored and no limit is applied.
	MaxScanObjects int
	// MaxScanBytes is the maximum number
	// of bytes to ingest in a single Scan or Sync operation
	// (not including merging).
	// If MaxScanBytes is less than or equal to zero,
	// it is ignored and no limit is applied.
	MaxScanBytes int64
	// MaxScanTime is the maximum amount of time
	// to spend listing objects before deciding
	// to bail out of a scan.
	MaxScanTime time.Duration

	// NewIndexScan, if true, enables scanning
	// for newly-created index objects.
	NewIndexScan bool

	// MaxInlineBytes is the maximum number
	// of (decompressed) data bytes for which
	// we should store references directly in
	// blockfmt.Index.Inline.
	// If this value is zero, then DefaultMaxInlineBytes
	// is used instead.
	MaxInlineBytes int64
	// TargetRefSize is the target size for stored
	// indirect references. If this value is zero,
	// a reasonable default is used.
	TargetRefSize int64

	// GCMaxDelay is the longest amount of time that
	// a gc cycle will spend blocking a batch insert operation.
	GCMaxDelay time.Duration
	// GCMinimumAge is the minimum time that
	// a packed file should be left around after
	// it has been dereferenced.
	// See blockfmt.Index.ToDelete.Expiry for
	// how this value is used.
	GCMinimumAge time.Duration

	// InputMinimumAge is the mininum time
	// that an input file leaf should be left
	// around after it is no longer referenced.
	// See blockfmt.Index.ToDelete.Expiry
	InputMinimumAge time.Duration

	// Logf, if non-nil, will be where
	// the builder will log build actions
	// as it is executing. Logf must be
	// safe to call from multiple goroutines
	// simultaneously.
	Logf func(f string, args ...interface{})

	Verbose bool
}

Config is a set of configuration items for synchronizing an Index to match a specification from a Definition.

func (*Config) Format

func (c *Config) Format(chosen, name string, hints []byte) (blockfmt.RowFormat, error)

Format picks the row format for an object based on an explicit format hint and the object name. The following are tried, in order:

  1. If 'chosen' is the name of a known format, then that format is returned.
  2. If 'name' has a suffix that indicates a known format, then that format is returned.
  3. If c.Fallback is non-nil, then Fallback(name) is returned.

Otherwise, Format returns nil.

func (*Config) Scan

func (c *Config) Scan(who Tenant, db, table string) (int, error)

Scan performs an incremental append operation on a table by listing input objects and adding them to the index. Scan returns the number of objects added to the table or an error. If Scan returns (0, nil), then scanning has already completed and no further calls to Scan are necessary to build the table.

Semantically, Scan performs a list operation and a call to c.Append on the listed items, taking care to list incrementally from the last call to Append.

func (*Config) SetFeatures

func (c *Config) SetFeatures(lst []string)

SetFeatures updates b to take into account a list of feature strings. Unknown feature strings are silently ignored.

See also Definition.Features.

func (*Config) Sync

func (c *Config) Sync(who Tenant, db, tblpat string) error

Sync reads each Definition in dst, converts the list of input objects into the right set of output objects, and writes the associated index signed with 'key'.

type ContextFS

type ContextFS interface {
	fs.FS
	// WithContext returns a copy of the file
	// system configured with the given context.
	WithContext(ctx context.Context) fs.FS
}

ContextFS can be implemented by a file system which allows the file system to be configured with a context which will be applied to all file system operations.

type Definition

type Definition struct {
	// Inputs is the list of inputs that comprise the table.
	Inputs []Input `json:"input,omitempty"`
	// Partitions specifies synthetic fields that
	// are generated from components of the input
	// URI and used to partition table data.
	Partitions []Partition `json:"partitions,omitempty"`
	// Retention is the expiration policy for data.
	// Data older than the expiration window will
	// be periodically purged from the backing
	// store during table updates.
	Retention *RetentionPolicy `json:"retention_policy,omitempty"`
	// Features is a list of feature flags that
	// can be used to turn on features for beta-testing.
	Features []string `json:"beta_features,omitempty"`
	// SkipBackfill, if true, will cause this table
	// to skip scanning the source bucket(s) for matching
	// objects when the first objects are inserted into the table.
	SkipBackfill bool `json:"skip_backfill,omitempty"`
}

Definition describes the set of input files that belong to a table.

func DecodeDefinition

func DecodeDefinition(src io.Reader) (*Definition, error)

DecodeDefinition decodes a definition from src using suffix as the hint for the format of the data in src. (You may pass the result of {file}path.Ext directly as suffix if you are reading from an os.File or fs.File.)

See also: OpenDefinition

func OpenDefinition

func OpenDefinition(s fs.FS, db, table string) (*Definition, error)

OpenDefinition opens a definition for the given database and table.

OpenDefinition calls DecodeDefinition on definition.json in the appropriate path for the given db and table.

func (*Definition) Equals

func (d *Definition) Equals(x *Definition) bool

Equals returns whether or not the table definitions are equivalent.

func (*Definition) Hash

func (d *Definition) Hash() []byte

Hash returns a hash of the table definition that can be used to detect changes.

type DirFS

type DirFS struct {
	*blockfmt.DirFS
	// contains filtered or unexported fields
}

DirFS is an InputFS implementation that can be used for local testing. It includes a local HTTP server bound to the loopback interface that will serve the directory contents.

func NewDirFS

func NewDirFS(dir string) *DirFS

NewDirFS constructs a new DirFS.

func (*DirFS) Close

func (d *DirFS) Close() error

Close closes the http server associated with the DirFS.

func (*DirFS) Encode

func (d *DirFS) Encode(dst *ion.Buffer, st *ion.Symtab) error

Encode writes the URL for the server to dst. This can be used by DecodeClientFS to access the DirFS remotely.

func (*DirFS) Prefix

func (d *DirFS) Prefix() string

Prefix is "file://"

func (*DirFS) ServeHTTP

func (d *DirFS) ServeHTTP(w http.ResponseWriter, r *http.Request)

ServeHTTP is a basic implementation of a file server which serves from this DirFS.

func (*DirFS) Start

func (d *DirFS) Start() (*ClientFS, error)

Start begins serving files and returns a ClientFS.

type GCConfig

type GCConfig struct {
	// MinimumAge, if non-zero, specifies
	// the minimum age for any objects removed
	// during a garbage-collection pass.
	// Note that objects are only candidates
	// for garbage collection if they are older
	// than the current index *and* not pointed to
	// by the current index, so the MinimumAge requirement
	// is only necessary if it is possible for GC and ingest
	// to run simultaneously. In that case, MinimumAge should be
	// set to some duration longer than any possible ingest cycle.
	MinimumAge      time.Duration
	InputMinimumAge time.Duration

	// MaxDelay is the maximum amount of time
	// that a GC will spend blocking batch inserts.
	// If MaxDelay is less than or equal to zero,
	// then the amount of time spent GC'ing is unlimited.
	MaxDelay time.Duration

	// Logf, if non-nil, is a callback used for logging
	// detailed information regarding GC decisions.
	Logf func(f string, args ...interface{})

	// Precise determines if GC is performed
	// by only deleting objects that have been
	// explicitly marked for deletion.
	Precise bool
}

GCConfig is a configuration for garbage collection.

func (*GCConfig) Run

func (c *GCConfig) Run(rfs RemoveFS, dbname string, idx *blockfmt.Index) error

Run calls rfs.Remove(path) for each path within the provided database name and table that a) has a filename pattern that indicates it was packed by Sync, at b) is not pointed to by idx.

type Input

type Input struct {
	// Pattern is the glob pattern that
	// specifies which files are fed into
	// the table. Patterns should be URIs
	// where the URI scheme (i.e. s3://, file://, etc.)
	// indicates where the data ought to come from.
	Pattern string `json:"pattern"`
	// Format is the format of the files in pattern.
	// If Format is the empty string, then the format
	// will be inferred from the file extension.
	Format string `json:"format,omitempty"`
	// Hints, if non-nil, is the hints associated
	// with the input data. The hints may perform
	// type-based coercion of certain paths, and may additionally
	// eliminate some of the data as it is parsed.
	// Hints data is format-specific.
	Hints json.RawMessage `json:"hints,omitempty"`
}

Input is one input pattern belonging to a Definition.

type InputFS

type InputFS interface {
	blockfmt.InputFS
}

type OutputFS

type OutputFS interface {
	blockfmt.UploadFS
}

type Partition

type Partition struct {
	// Field is the name of the partition field. If
	// this field conflicts with a field in the
	// input data, the partition field will
	// override it.
	Field string `json:"field"`
	// Type is the type of the partition field.
	// If this is "", this defaults to "string".
	Type string `json:"type,omitempty"`
	// Value is a template string that is used to
	// produce the value for the partition field.
	// The template may reference parts of the
	// input URI specified in the input pattern.
	// If this is "", the field name is used to
	// determine the input URI part that will be
	// used to determine the value.
	Value string `json:"value,omitempty"`
}

A Partition defines a synthetic field that is generated from parts of an input URI and used to partition table data.

type Queue

type Queue interface {
	// Close should be called when the
	// runner has finished processing items
	// from the queue (usually due to receiving
	// and external signal to stop processing events).
	// Close should only be called after all events
	// returned by Next have been processed via calls
	// to Finalize.
	io.Closer
	// Next should return the next item
	// in the queue. If the provided pause
	// duration is non-negative, then Next
	// should block for up to the provided duration
	// to produce a new value. If pause is negative,
	// then Next should block until it can return
	// a non-nil QueueItem value or an EOF error.
	// Next should return (nil, io.EOF) is the queue
	// has been closed and no further processing should
	// be performed.
	//
	// As an optimization, the returned QueueItem
	// can implement fs.File, which will obviate
	// the need for the caller to perform additional
	// I/O to produce a file handle associated with
	// the QueueItem.
	Next(pause time.Duration) (QueueItem, error)
	// Finalize is called to return the final status
	// of a QueueItem that was previously returned by
	// ReadInputs. If status is anything other than
	// StatusOK, then the Queue should arrange for the
	// item to be read by a future call to ReadInputs.
	//
	// Finalize may panic if Queue.Close has been called.
	Finalize(item QueueItem, status QueueStatus)
}

type QueueItem

type QueueItem interface {
	// Path should return the full path
	// of the file, including the fs prefix.
	Path() string
	// ETag should return the ETag of
	// the file.
	ETag() string
	// Size should return the size of
	// the file at Path in bytes.
	Size() int64
	// EventTime should return the time
	// at which the queue item was inserted
	// into the queue. EventTime is used to
	// compute statistics about total queue delays.
	EventTime() time.Time
}

QueueItem represents an item in a notification queue.

type QueueRunner

type QueueRunner struct {
	Owner Tenant
	// Conf is the configuration
	// used for building tables.
	Conf Config

	// Logf is used to log errors encountered
	// while processing entries from a queue.
	// Logf may be nil.
	Logf func(f string, args ...interface{})

	// Open is a hook that can be used to override
	// how queue items are opened.
	// The default behavior is to use ifs.Open(item.Path())
	Open func(ifs InputFS, item QueueItem) (fs.File, error)

	// TableRefresh is the interval at which
	// table definitions are refreshed.
	// If TableRefresh is less than or equal
	// to zero, then tables are refreshed every minute.
	TableRefresh time.Duration

	// BatchSize is the maximum number of bytes
	// that the QueueRunner will attempt to read
	// from a Queue in Run before comitting the
	// returned items. Batches may be smaller than
	// BatchSize due to the expiration of BatchInterval
	// or due to receiving an error from the queue
	// after batching a non-zero number of items.
	// (The size of a batch is computed by summing
	// the QueueItem.Size values from each QueueItem
	// returned from Next.)
	//
	// If BatchSize is less than or equal to zero,
	// then DefaultBatchSize is used instead.
	//
	// See also: BatchInterval
	BatchSize int64

	// BatchInterval is the maximum amount of
	// time the queue should wait for successive
	// calls to Queue.Next to accumulate BatchSize items.
	//
	// See also: BatchSize
	BatchInterval time.Duration

	// IOErrDelay determines how long queue processing
	// will pause if it encounters an I/O error from
	// the backing filesystem.
	IOErrDelay time.Duration
}

QueueRunner encapsulates the state required to process a single queue.

func (*QueueRunner) Run

func (q *QueueRunner) Run(in Queue) error

Run processes entries from in until ReadInputs returns io.EOF, at which point it will call in.Close.

type QueueStatus

type QueueStatus int32

QueueStatus indicates the processing status of a QueueItem.

func (QueueStatus) Merge

func (s QueueStatus) Merge(other QueueStatus) QueueStatus

Merge returns the more sever status of either s or other.

type RemoveFS

type RemoveFS interface {
	fs.FS
	Remove(name string) error
}

RemoveFS is an fs.FS with a Remove operation.

type RetentionPolicy

type RetentionPolicy struct {
	// Field is the path expression for the field
	// used to determine the age of a record for
	// the purpose of the data retention policy.
	//
	// Currently only timestamp fields are
	// supported.
	Field string `json:"field,omitempty"`
	// ValidFor is the validity window relative to
	// now.
	//
	// This is a string with a format like
	// "<n>y<n>m<n>d" where "<n>" is a number and
	// any component can be omitted.
	//
	// For example: "6m", "1000d", "1y6m15d"
	ValidFor date.Duration `json:"valid_for"`
}

RetentionPolicy describes a policy for retaining data.

For a given field and validity window, the retention policy only retains data that satisfies the relation

field >= (now - valid_for)

type S3FS

type S3FS struct {
	blockfmt.S3FS
}

S3FS is an FS implementation that is backed by an S3 bucket.

func DecodeS3FS

func DecodeS3FS(d ion.Datum) (*S3FS, error)

DecodeS3FS decodes the output of (*S3FS).Encode.

func (*S3FS) Encode

func (s *S3FS) Encode(dst *ion.Buffer, st *ion.Symtab) error

Encode implements plan.UploadFS

func (*S3FS) URL

func (s *S3FS) URL(name, etag string) (string, error)

URL implements db.URL

type S3Resolver

type S3Resolver struct {
	// DeriveKey is the callback used to
	// derive a key for a particular bucket.
	DeriveKey func(bucket string) (*aws.SigningKey, error)
	// Client, if non-nil, sets the default
	// client used by returned s3.BucketFS objects.
	Client *http.Client
	Ctx    context.Context
}

S3Resolver is a resolver that expects only s3:// schemes.

func (*S3Resolver) Split

func (s *S3Resolver) Split(pattern string) (InputFS, string, error)

Split implements Resolver.Split

type Tenant

type Tenant interface {
	// ID should return the unique ID of the tenant.
	ID() string

	// Key should return the key used
	// for verifying the integrity of database objects.
	Key() *blockfmt.Key

	// Root should return the root of the
	// storage for database objects.
	// The returned FS should implement
	// UploadFS if it supports writing.
	Root() (InputFS, error)

	// Split should trim the prefix off of pattern
	// that specifies the source filesystem and return
	// the result as an InputFS and the trailing glob
	// pattern that can be applied to the input to yield
	// the results.
	Split(pattern string) (InputFS, string, error)
}

Tenant is the set of information necessary to perform queries on behalf of a tenant.

func NewLocalTenant

func NewLocalTenant(fs InputFS) Tenant

NewLocalTenant creates a tenant that uses given FS as backend

func NewLocalTenantFromPath

func NewLocalTenantFromPath(path string) Tenant

NewLocalTenantFromPath creates a tenats that uses DirFS with given path

type TenantConfig

type TenantConfig struct {
	// MaxScanBytes is the maximum number of bytes
	// allowed to be scanned for each query. If
	// this is 0, there is no limit.
	MaxScanBytes uint64
}

TenantConfig holds configuration for each tenant.

type TenantConfigurable

type TenantConfigurable interface {
	Tenant

	// Config returns the configuration options
	// for this tenant. This may return nil to
	// indicate all defaults should be used.
	Config() *TenantConfig
}

TenantConfigurable is a tenant that may provide preferred configuration.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL