metastore

package
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 27, 2018 License: Apache-2.0 Imports: 18 Imported by: 9

Documentation

Index

Constants

View Source
const (
	DefaultBatchSize                      = 2097152
	DefaultArchivingDelayMinutes          = 1440
	DefaultArchivingIntervalMinutes       = 180
	DefaultBackfillIntervalMinutes        = 60
	DefaultBackfillMaxBufferSize    int64 = 4294967296
	DefaultBackfillThresholdInBytes int64 = 2097152
	DefaultBackfillStoreBatchSize         = 20000
	DefaultRecordRetentionInDays          = 90
	DefaultSnapshotIntervalMinutes        = 360                  // 6 hours
	DefaultSnapshotThreshold              = 3 * DefaultBatchSize // 3 batches
	DefaultRedologRotationInterval        = 10800                // 3 hours
	DefaultMaxRedoLogSize                 = 1 << 30              // 1 GB
)

meaningful defaults of table configurations.

Variables

View Source
var (
	// ErrTableDoesNotExist indicates Table does not exist
	ErrTableDoesNotExist = errors.New("Table does not exist")
	// ErrTableAlreadyExist indicates Table already exists
	ErrTableAlreadyExist = errors.New("Table already exists")
	// ErrColumnDoesNotExist indicates Column does not exist error
	ErrColumnDoesNotExist = errors.New("Column does not exist")
	// ErrColumnAlreadyExist indicates Column already exists
	ErrColumnAlreadyExist = errors.New("Column already exists")
	// ErrColumnAlreadyDeleted indicates Column already deleted
	ErrColumnAlreadyDeleted = errors.New("Column already deleted")
	// ErrNotEnumColumn indicates Column is not enum type
	ErrNotEnumColumn = errors.New("Column is not enum type")
	// ErrShardDoesNotExist indicates Shard does not exist
	ErrShardDoesNotExist = errors.New("Shard does not exist")
	// ErrNotFactTable indicates table not a fact table
	ErrNotFactTable = errors.New("Table is not fact table")
	// ErrNotDimensionTable indicates table is not a dimension table
	ErrNotDimensionTable = errors.New("Table is not dimension table")
	// ErrWatcherAlreadyExist indicates table is not a dimension table
	ErrWatcherAlreadyExist = errors.New("Watcher already registered")
	// ErrDeleteTimeColumn indicates column is time column and cannot be deleted
	ErrDeleteTimeColumn = errors.New("Time column cannot be deleted")
	// ErrDeletePrimaryKeyColumn indicates column belongs to primary key cannot be deleted
	ErrDeletePrimaryKeyColumn = errors.New("Primary key column cannot be deleted")
	// ErrChangePrimaryKeyColumn indicates primary key columns cannot be changed
	ErrChangePrimaryKeyColumn = errors.New("Primary key column cannot be changed")
	// ErrAllColumnsInvalid indicates all columns are invalid
	ErrAllColumnsInvalid = errors.New("All columns are invalid")
	// ErrMissingPrimaryKey indicates a schema does not have primary key
	ErrMissingPrimaryKey = errors.New("Primary key columns not specified")
	// ErrColumnNonExist indicates a column used does not exist
	ErrColumnNonExist = errors.New("Column does not exist")
	// ErrColumnDeleted indicates a column used was deleted
	ErrColumnDeleted = errors.New("Column already deleted")
	// ErrInvalidDataType indicates invalid data type
	ErrInvalidDataType = errors.New("Invalid data type")
	// ErrIllegalSchemaVersion indicates new schema is not greater than old one
	ErrIllegalSchemaVersion = errors.New("New schema version not greater than old")
	// ErrSchemaUpdateNotAllowed indicates changes attemped on immutable fields
	ErrSchemaUpdateNotAllowed = errors.New("Illegal schame update on immutable field")
	// ErrInsufficientColumnCount indicates no column in a schame
	ErrInsufficientColumnCount = errors.New("Insufficient column count")
	// ErrReusingColumnIDNotAllowed indicates attempt to reuse id of deleted column
	ErrReusingColumnIDNotAllowed = errors.New("Reusing column id not allowed")
	// ErrNewColumnWithDeletion indicates adding a new column with deleted flag on
	ErrNewColumnWithDeletion = errors.New("Can not add column with deleted flag on")
	// ErrIllegalChangeSortColumn indicates illegal changes on sort columns
	ErrIllegalChangeSortColumn = errors.New("Illegal changes on sort columns")
	// ErrDuplicatedColumn indicates a column is used more than onces in sort or pk columns
	ErrDuplicatedColumn = errors.New("Illegal deplicated use of column")
	// ErrDuplicatedColumnName indicates duplicated column name in same table
	ErrDuplicatedColumnName          = errors.New("Duplicated column name found")
	ErrTimeColumnDoesNotAllowDefault = errors.New("Time column does not allow default value")
	ErrDisallowMissingEventTime      = errors.New("Can not disallow missing event time")
)

Functions

func ValidateDefaultValue

func ValidateDefaultValue(valueStr, dataTypeStr string) (err error)

ValidateDefaultValue validates default value against data type

Types

type MetaStore

type MetaStore interface {
	GetEnumDict(table, column string) ([]string, error)

	// Sets the watcher for the specified enum column.
	// Should only be called once for each enum column.
	// Returns a events channel that emits enum cases starting from startCase,
	// and a done channel for consumer to ack once the event is processed.
	WatchEnumDictEvents(table, column string, startCase int) (events <-chan string, done chan<- struct{}, err error)

	// Returns the latest archiving/live cutoff for the specified shard.
	GetArchivingCutoff(table string, shard int) (uint32, error)

	// PurgeArchiveBatches deletes the metadata related to the archive batch
	PurgeArchiveBatches(table string, shard, batchIDStart, batchIDEnd int) error
	// Returns the version to use for the specified archive batch and size of the batch with the
	// specified archiving/live cutoff.
	GetArchiveBatchVersion(table string, shard, batchID int, cutoff uint32) (uint32, uint32, int, error)
	// Returns the latest snapshot version for the specified shard.
	// the return value is: redoLogFile, offset, lastReadBatchID, lastReadBatchOffset
	GetSnapshotProgress(table string, shard int) (int64, uint32, int32, uint32, error)

	// shard ownership.
	GetOwnedShards(table string) ([]int, error)

	// Set the watcher for table shard ownership change events.
	// Should only be called once.
	// Returns an event channel that emits desired ownership states,
	// and a done channel for consumer to ack once the event is processed.
	WatchShardOwnershipEvents() (events <-chan common.ShardOwnership, done chan<- struct{}, err error)

	// A subset of newly added columns can be appended to the end of
	// ArchivingSortColumns by adding their index in columns to archivingSortColumns
	// Update column config.
	// Returns the assigned case IDs for each case string.
	ExtendEnumDict(table, column string, enumCases []string) ([]int, error)

	// Adds a version and size for the specified archive batch.
	AddArchiveBatchVersion(table string, shard, batchID int, version uint32, seqNum uint32, batchSize int) error

	// Updates the archiving/live cutoff time for the specified shard. This is used
	// by the archiving job after each successful run.
	UpdateArchivingCutoff(table string, shard int, cutoff uint32) error

	// Updates the latest snapshot version for the specified shard.
	UpdateSnapshotProgress(table string, shard int, redoLogFile int64, upsertBatchOffset uint32, lastReadBatchID int32, lastReadBatchOffset uint32) error

	// Updates the latest redolog/offset that have been backfilled for the specified shard.
	UpdateBackfillProgress(table string, shard int, redoLogFile int64, offset uint32) error

	// Retrieve the latest redolog/offset that have been backfilled for the specified shard.
	GetBackfillProgressInfo(table string, shard int) (int64, uint32, error)

	TableSchemaWatchable
	TableSchemaMutator
}

MetaStore defines interfaces of the external metastore, which can be implemented using file system, SQLite, Zookeeper etc.

func NewDiskMetaStore

func NewDiskMetaStore(basePath string) (MetaStore, error)

NewDiskMetaStore creates a new disk based metastore

type SchemaFetchJob

type SchemaFetchJob struct {
	// contains filtered or unexported fields
}

SchemaFetchJob is a job that periodically pings ares-controller and updates table schemas if applicable

func NewSchemaFetchJob

func NewSchemaFetchJob(intervalInSeconds int, schemaMutator TableSchemaMutator, schemaValidator TableSchemaValidator, controllerClient clients.ControllerClient, clusterName, initialHash string) *SchemaFetchJob

NewSchemaFetchJob creates a new SchemaFetchJob

func (*SchemaFetchJob) Run

func (j *SchemaFetchJob) Run()

Run starts the scheduling

func (*SchemaFetchJob) Stop

func (j *SchemaFetchJob) Stop()

Stop stops the scheduling

type TableSchemaMutator

type TableSchemaMutator interface {
	TableSchemaReader
	CreateTable(table *common.Table) error
	DeleteTable(name string) error
	UpdateTableConfig(table string, config common.TableConfig) error
	UpdateTable(table common.Table) error

	// A subset of newly added columns can be appended to the end of
	// ArchivingSortColumns by adding their index in columns to archivingSortColumns
	AddColumn(table string, column common.Column, appendToArchivingSortOrder bool) error
	// Update column config.
	UpdateColumn(table string, column string, config common.ColumnConfig) error
	DeleteColumn(table string, column string) error
}

TableSchemaMutator mutates table metadata

type TableSchemaReader

type TableSchemaReader interface {
	ListTables() ([]string, error)
	GetTable(name string) (*common.Table, error)
}

TableSchemaReader reads table schema

type TableSchemaValidator

type TableSchemaValidator interface {
	SetOldTable(table common.Table)
	SetNewTable(table common.Table)
	Validate() error
}

TableSchemaValidator validates it a new table schema is valid, given existing schema

func NewTableSchameValidator

func NewTableSchameValidator() TableSchemaValidator

NewTableSchameValidator returns a new TableSchemaValidator. Pass nil for oldTable if none exists

type TableSchemaWatchable

type TableSchemaWatchable interface {
	// Sets the watcher for table list change (table deletion) events.
	// Should only be called once.
	// Returns a events channel that emits the entire table list on each table deletion event,
	// and a done channel for consumer to ack once the event is processed.
	WatchTableListEvents() (events <-chan []string, done chan<- struct{}, err error)
	// Sets the watcher for table modification/addition events.
	// Should only be called once.
	// Returns a events channel that emits the table schema on each change event for given table,
	// and a done channel for consumer to ack once the event is processed.
	WatchTableSchemaEvents() (events <-chan *common.Table, done chan<- struct{}, err error)
}

TableSchemaWatchable watches table schema update events

Directories

Path Synopsis
Code generated by mockery v1.0.0 Code generated by mockery v1.0.0 Code generated by mockery v1.0.0 Code generated by mockery v1.0.0 Code generated by mockery v1.0.0
Code generated by mockery v1.0.0 Code generated by mockery v1.0.0 Code generated by mockery v1.0.0 Code generated by mockery v1.0.0 Code generated by mockery v1.0.0

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL