executor

package
v4.1.23 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 7, 2022 License: Apache-2.0 Imports: 27 Imported by: 0

Documentation

Index

Constants

View Source
const WriteChannelCommandDepth = 1000000

NOTE: Access to the TransactionPipe structures is single threaded with the exception of the CommandChannel. Modification of the cache contents is performed by de-queueing commands from the CommandChannel in the single Cache thread.

Variables

This section is empty.

Functions

func FullPathToWALKey added in v4.0.1

func FullPathToWALKey(rootPath, fullPath string) (keyPath string)

func GetTimeFromTicks

func GetTimeFromTicks(intervalStart uint64, intervalsPerDay, intervalTicks uint32) (sec uint64, nanosec uint32)

GetTimeFromTicks Takes two time components, the start of the interval and the number of interval ticks to the timestamp and returns an epoch time (seconds) and the number of nanoseconds of fractional time within the last second as a remainder.

func NewByIntervalTicks

func NewByIntervalTicks(buffer []byte, numWords, recordLength int) sort.Interface

func ParseTGData added in v4.1.0

func ParseTGData(tgSerialized []byte, rootPath string) (tgID int64, wtSets []wal.WTSet)

func RewriteBuffer

func RewriteBuffer(buffer []byte, varRecLen, numVarRecords, intervalsPerDay uint32, intervalStartEpoch uint64) []byte

RewriteBuffer converts variable_length records to the result buffer.

variable records in a file: [Actual Data (VarRecLen-4 byte) , Interval Ticks(4 byte) ] rewriteBuffer converts the binary data to [EpochSecond(8 byte), Actual Data(VarRecLen-4 byte), Nanoseconds(4 byte) ] format.

buffer +-----------------------VarRecLen [byte]---+-----------------------+ + Actual Data(Ask,Bid, etc.) | IntevalTicks(4byte) | +------------------------------------------+------------------------+

↓ rewriteBuffer

rbTemp (= temporary result buffer) +--------------------+--VarRecLen + 8 [byte]-----+-------------------+ + EpochSecond(8byte) | Actual Data(Ask,Bid, etc) | Nanosecond(4byte) | +--------------------+----------------------------+------------------+.

func TimeOfVariableRecord

func TimeOfVariableRecord(buf []byte, cursor, rowLength int) time.Time

func WriteBufferToFile

func WriteBufferToFile(fp stdio.WriterAt, buffer wal.OffsetIndexBuffer) error

func WriteBufferToFileIndirect

func WriteBufferToFileIndirect(fp stdio.ReadWriteSeeker, buffer wal.OffsetIndexBuffer, varRecLen int,
) (err error)

func WriteCSM

func WriteCSM(csm io.ColumnSeriesMap, isVariableLength bool) (err error)

WriteCSM writes ColumnSeriesMap (csm) to each destination file, and flush it to the disk, isVariableLength is set to true if the record content is variable-length type. WriteCSM also verifies the DataShapeVector of the incoming ColumnSeriesMap matches the on-disk DataShapeVector defined by the file header. WriteCSM will create any files if they do not already exist for the given ColumnSeriesMap based on its TimeBucketKey.

Types

type ByIntervalTicks

type ByIntervalTicks struct {
	// contains filtered or unexported fields
}

ByIntervalTicks implements a custom sort for Variable length record. Sort by the last 4 byte (= intervalTicks) of each record.

func (*ByIntervalTicks) Len

func (ei *ByIntervalTicks) Len() int

func (*ByIntervalTicks) Less

func (ei *ByIntervalTicks) Less(i, j int) bool

Less reports whether the element with index i should sort before the element with index j.

func (*ByIntervalTicks) Swap

func (ei *ByIntervalTicks) Swap(i, j int)

Swap swaps the elements with indexes i and j.

type CacheEntryAlreadyOpenError

type CacheEntryAlreadyOpenError string

WAL Messages.

func (CacheEntryAlreadyOpenError) Error

func (msg CacheEntryAlreadyOpenError) Error() string

type CacheImmutableError

type CacheImmutableError string

func (CacheImmutableError) Error

func (msg CacheImmutableError) Error() string

type CachedFP

type CachedFP struct {
	// contains filtered or unexported fields
}

func NewCachedFP

func NewCachedFP() *CachedFP

func (*CachedFP) Close

func (cfp *CachedFP) Close() error

func (*CachedFP) GetFP

func (cfp *CachedFP) GetFP(fileName string) (fp *os.File, err error)

func (*CachedFP) String added in v4.1.7

func (cfp *CachedFP) String() string

type Deleter added in v4.1.14

type Deleter struct {
	IOPMap map[utilsio.TimeBucketKey]*IOPlan
	// contains filtered or unexported fields
}

func NewDeleter

func NewDeleter(pr *planner.ParseResult) (de *Deleter, err error)

func (*Deleter) Delete added in v4.1.14

func (de *Deleter) Delete() (err error)

type DestEnum

type DestEnum int8

--- Destination ID.

const (
	WAL DestEnum = iota
	CHECKPOINT
)

type ErrorWriter added in v4.1.2

type ErrorWriter struct{}

func (*ErrorWriter) WriteCSM added in v4.1.2

func (w *ErrorWriter) WriteCSM(csm io.ColumnSeriesMap, isVariableLength bool) error

type IOPlan added in v4.1.13

type IOPlan struct {
	FilePlan          []*ioFilePlan
	RecordLen         int32
	RecordType        utilsio.EnumRecordType
	VariableRecordLen int
	Limit             *planner.RowLimit
	Range             *planner.DateRange
	TimeQuals         []planner.TimeQualFunc
}

func NewIOPlan

func NewIOPlan(fl SortedFileList, limit *planner.RowLimit, range2 *planner.DateRange, timeQuals []planner.TimeQualFunc,
) (iop *IOPlan, err error)

type IndirectRecordInfo

type IndirectRecordInfo struct {
	Index, Offset, Len int64
}

type InstanceMetadata

type InstanceMetadata struct {
	CatalogDir *catalog.Directory
	WALFile    *WALFileType
}
var ThisInstance *InstanceMetadata

func NewInstanceSetup

func NewInstanceSetup(catalogDir *catalog.Directory, walfile *WALFileType) *InstanceMetadata

type MIDEnum

type MIDEnum int8

Message Types for WAL Messages --- Message ID.

const (
	TGDATA MIDEnum = iota
	TXNINFO
	STATUS
)

type NopReplicationSender added in v4.1.18

type NopReplicationSender struct{}

func (*NopReplicationSender) Run added in v4.1.18

func (nrs *NopReplicationSender) Run(_ context.Context)

func (*NopReplicationSender) Send added in v4.1.18

func (nrs *NopReplicationSender) Send(_ []byte)

type NotOpenError

type NotOpenError string

func (NotOpenError) Error

func (msg NotOpenError) Error() string

type Reader added in v4.0.1

type Reader struct {
	IOPMap map[utilsio.TimeBucketKey]*IOPlan
	// contains filtered or unexported fields
}

func NewReader

func NewReader(pr *planner.ParseResult) (r *Reader, err error)

func (*Reader) Read added in v4.0.1

func (r *Reader) Read() (csm utilsio.ColumnSeriesMap, err error)

type RecordLengthNotConsistent

type RecordLengthNotConsistent string

func (RecordLengthNotConsistent) Error

func (msg RecordLengthNotConsistent) Error() string

type ReplicationSender added in v4.1.0

type ReplicationSender interface {
	Run(ctx context.Context)
	Send(transactionGroup []byte)
}

type SingleTargetRequiredForWriter

type SingleTargetRequiredForWriter string

func (SingleTargetRequiredForWriter) Error

type SortedFileList

type SortedFileList []planner.QualifiedFile

func (SortedFileList) Len

func (fl SortedFileList) Len() int

func (SortedFileList) Less

func (fl SortedFileList) Less(i, j int) bool

func (SortedFileList) Swap

func (fl SortedFileList) Swap(i, j int)

type TGIDlist

type TGIDlist []int64

func (TGIDlist) Len

func (tgl TGIDlist) Len() int

func (TGIDlist) Less

func (tgl TGIDlist) Less(i, j int) bool

func (TGIDlist) Swap

func (tgl TGIDlist) Swap(i, j int)

type TransactionGroup added in v4.1.0

type TransactionGroup struct {
	// A "locally unique" transaction group identifier, can be a clock value
	ID int64
	// The contents of the WTSets
	WTGroup []wal.WTSet
	// MD5 checksum of the TG contents prior to the checksum
	Checksum [16]byte
}

type TransactionPipe

type TransactionPipe struct {
	// contains filtered or unexported fields
}

TransactionPipe stores the contents of the current pending Transaction Group and writes it to WAL when flush() is called.

func NewTransactionPipe

func NewTransactionPipe() *TransactionPipe

NewTransactionPipe creates a new transaction pipe that channels all of the write transactions to the WAL and primary writers.

func (*TransactionPipe) IncrementTGID added in v4.0.2

func (tgc *TransactionPipe) IncrementTGID() int64

IncrementTGID increments the transaction group ID and returns the new value.

func (*TransactionPipe) TGID

func (tgc *TransactionPipe) TGID() int64

TGID returns the latest transaction group ID.

type TriggerPluginDispatcher added in v4.1.2

type TriggerPluginDispatcher struct {
	// contains filtered or unexported fields
}

func StartNewTriggerPluginDispatcher added in v4.1.18

func StartNewTriggerPluginDispatcher(triggerMatchers []*trigger.Matcher) *TriggerPluginDispatcher

func (*TriggerPluginDispatcher) AppendRecord added in v4.1.2

func (tpd *TriggerPluginDispatcher) AppendRecord(keyPath string, record []byte)

AppendRecord collects the record from the serialized buffer.

func (*TriggerPluginDispatcher) DispatchRecords added in v4.1.2

func (tpd *TriggerPluginDispatcher) DispatchRecords()

DispatchRecords iterates over the registered triggers and fire the event if the file path matches the condition. This is meant to be run in a separate goroutine and recovers from panics in the triggers.

type TxnStatusEnum

type TxnStatusEnum int8

--- Status.

const (
	PREPARING TxnStatusEnum = iota
	COMMITINTENDED
	COMMITCOMPLETE
)

type WALCleaner added in v4.1.7

type WALCleaner struct {
	// contains filtered or unexported fields
}

func NewWALCleaner added in v4.1.7

func NewWALCleaner(ignoreFile string, myInstanceID int64) *WALCleaner

func (*WALCleaner) CleanupOldWALFiles added in v4.1.7

func (c *WALCleaner) CleanupOldWALFiles(walfileAbsPaths []string) error

type WALCreateError

type WALCreateError string

func (WALCreateError) Error

func (msg WALCreateError) Error() string

type WALFileType

type WALFileType struct {
	// These three fields plus the MID form the WAL Header, written at the beginning of the WAL File
	FileStatus       wal.FileStatusEnum
	ReplayState      wal.ReplayStateEnum
	OwningInstanceID int64

	FilePtr *os.File // Active file pointer to FileName

	ReplicationSender ReplicationSender // send messages to replica servers
	WALBypass         bool              // TODO: refactor: unexport this param
	BackgroundSync    bool              // TODO: refactor: unexport this param
	// contains filtered or unexported fields
}

func NewWALFile

func NewWALFile(rootDir string, owningInstanceID int64, rs ReplicationSender,
	walBypass bool, walWaitGroup *sync.WaitGroup, tpd *TriggerPluginDispatcher,
	txnPipe *TransactionPipe,
) (wf *WALFileType, err error)

func TakeOverWALFile added in v4.0.1

func TakeOverWALFile(filePath string) (wf *WALFileType, err error)

TakeOverWALFile opens an existing wal file and returns WALFileType for it.

func (*WALFileType) CanWrite

func (wf *WALFileType) CanWrite(callersInstanceID int64) (bool, error)

func (*WALFileType) CreateCheckpoint added in v4.0.1

func (wf *WALFileType) CreateCheckpoint() error

CreateCheckpoint flushes all primary dirty pages to disk, and so closes out the previous WAL state to end. Note, this is not goroutine-safe with FlushToWAL and caller should make sure it is streamlined.

func (*WALFileType) Delete

func (wf *WALFileType) Delete(callersInstanceID int64) (err error)

func (*WALFileType) FlushCommandsToWAL added in v4.1.0

func (wf *WALFileType) FlushCommandsToWAL(writeCommands []*wal.WriteCommand) (err error)

func (*WALFileType) FlushToWAL added in v4.0.1

func (wf *WALFileType) FlushToWAL() (err error)

FlushToWAL A.k.a. Commit transaction.

Here we flush the contents of the write cache to:
- Primary storage via the OS write cache - data is visible to readers
- WAL file with synchronization to physical storage - in case we need to recover from a crash

func (*WALFileType) IncrementWaitGroup added in v4.1.18

func (wf *WALFileType) IncrementWaitGroup()

func (*WALFileType) IsOpen

func (wf *WALFileType) IsOpen() bool

func (*WALFileType) NeedsReplay

func (wf *WALFileType) NeedsReplay() (bool, error)

func (*WALFileType) QueueWriteCommand added in v4.1.2

func (wf *WALFileType) QueueWriteCommand(wc *wal.WriteCommand)

func (*WALFileType) Replay

func (wf *WALFileType) Replay(dryRun bool) error

Replay loads this WAL File's unwritten transactions to primary store and mark it completely processed. We will do this in two passes, in the first pass we will collect the Transaction Group IDs that are not yet durably written to the primary store. In the second pass, we write the data into the Primary Store directly and then flush the results. Finally we close the WAL File and mark it completely written.

1) First WAL Pass: Locate unwritten TGIDs 2) Second WAL Pass: Load the open TG data into the Primary Data files 3) Flush the TG Cache to primary and mark this WAL File completely processed

Note that the TG Data for any given TGID should appear in the WAL only once. We verify it in the first pass.

func (*WALFileType) RequestFlush

func (wf *WALFileType) RequestFlush()

RequestFlush requests WAL Flush to the WAL writer goroutine if it exists, or just does the work in the same goroutine otherwise. The function blocks if there are no current queued flushes, and returns if there is already one queued which will handle the data present in the write channel, as it will flush as soon as possible.

func (*WALFileType) Shutdown added in v4.1.18

func (wf *WALFileType) Shutdown()

func (*WALFileType) SyncWAL

func (wf *WALFileType) SyncWAL(walRefresh, primaryRefresh time.Duration, walRotateInterval int)

func (*WALFileType) WriteCommand added in v4.1.2

func (wf *WALFileType) WriteCommand(rt io.EnumRecordType, tbiAbsPath string, varRecLen int, offset, index int64,
	data []byte, ds []io.DataShape,
) *wal.WriteCommand

func (*WALFileType) WriteStatus

func (wf *WALFileType) WriteStatus(fileStatus wal.FileStatusEnum, replayState wal.ReplayStateEnum) error

func (*WALFileType) WriteTransactionInfo

func (wf *WALFileType) WriteTransactionInfo(tid int64, did DestEnum, txnStatus TxnStatusEnum) error

type WALTakeOverError

type WALTakeOverError string

func (WALTakeOverError) Error

func (msg WALTakeOverError) Error() string

type WALWriteError

type WALWriteError string

func (WALWriteError) Error

func (msg WALWriteError) Error() string

type Writer

type Writer struct {
	// contains filtered or unexported fields
}

Writer is produced that complies with the parsed query results, including a possible date range restriction. If there is a date range restriction, the write() routine should produce an error when an out-of-bounds write is tried.

func NewWriter

func NewWriter(rootCatDir *catalog.Directory, walFile *WALFileType) (*Writer, error)

func (*Writer) WriteCSM added in v4.1.2

func (w *Writer) WriteCSM(csm io.ColumnSeriesMap, isVariableLength bool) error

WriteCSM has the same logic as the executor.WriteCSM function. In order to improve testability, use this function instead of the static WriteCSM function.

func (*Writer) WriteRecords

func (w *Writer) WriteRecords(ts []time.Time, data []byte, dsWithEpoch []io.DataShape, tbi *io.TimeBucketInfo) error

WriteRecords creates a WriteCommand from the supplied timestamp and data buffer, and sends it over the write channel to be flushed to disk in the WAL sync subroutine. The caller should assume that by calling WriteRecords directly, the data will be written to the file regardless if it satisfies the on-disk data shape, possible corrupting the data files. It is recommended to call WriteCSM() for any writes as it is safer.

type WrongSizeError

type WrongSizeError string

func (WrongSizeError) Error

func (msg WrongSizeError) Error() string

Directories

Path Synopsis
Package buffile helps batch write by writes to the temporary in-memory buffer under the assumption that many writes come in to the part of single file frequently.
Package buffile helps batch write by writes to the temporary in-memory buffer under the assumption that many writes come in to the part of single file frequently.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL