Documentation ¶
Index ¶
- Constants
- func AppendIntervalTicks(buf []byte, t time.Time, index, intervalsPerDay int64) (outBuf []byte)
- func FinishAndWait()
- func NewIOPlan(fl SortedFileList, pr *planner.ParseResult) (iop *ioplan, err error)
- func NewInstanceSetup(relRootDir string, options ...bool)
- func NewReader(pr *planner.ParseResult) (r *reader, err error)
- func StartupCacheAndWAL(rootDir string) (tgc *TransactionPipe, wf *WALFileType, err error)
- func WriteBufferToFile(fp stdio.WriterAt, buffer offsetIndexBuffer) error
- func WriteBufferToFileIndirect(fp *os.File, buffer offsetIndexBuffer) (err error)
- func WriteCSM(csm io.ColumnSeriesMap, isVariableLength bool) (err error)
- type CacheEntryAlreadyOpenError
- type CacheImmutableError
- type CachedFP
- type DestEnum
- type FileStatusEnum
- type IndirectRecordInfo
- type InstanceMetadata
- type MIDEnum
- type NotOpenError
- type RecordLengthNotConsistent
- type ReplayStateEnum
- type ShortReadError
- type SingleTargetRequiredForWriter
- type SortedFileList
- type TGIDlist
- type TransactionPipe
- type TxnStatusEnum
- type WALCreateError
- type WALFileType
- func (wf *WALFileType) CanDeleteSafely() bool
- func (wf *WALFileType) CanWrite(msg string) bool
- func (wf *WALFileType) Close(ReplayStatus ReplayStateEnum)
- func (wf *WALFileType) Delete() (err error)
- func (wf *WALFileType) FullPathToWALKey(fullPath string) (keyPath string)
- func (wf *WALFileType) IsOpen() bool
- func (wf *WALFileType) NeedsReplay() bool
- func (wf *WALFileType) Open() error
- func (wf *WALFileType) ReadStatus() (fileStatus FileStatusEnum, replayStatus ReplayStateEnum, ...)
- func (wf *WALFileType) Replay(writeData bool) error
- func (wf *WALFileType) RequestFlush()
- func (wf *WALFileType) SyncWAL(WALRefresh, PrimaryRefresh time.Duration, walRotateInterval int)
- func (wf *WALFileType) WALKeyToFullPath(keyPath string) (fullPath string)
- func (wf *WALFileType) WriteStatus(FileStatus FileStatusEnum, ReplayState ReplayStateEnum)
- func (wf *WALFileType) WriteTransactionInfo(tid int64, did DestEnum, txnStatus TxnStatusEnum)
- type WALTakeOverError
- type WALWriteError
- type WriteCommand
- type Writer
- type WrongSizeError
Constants ¶
const RecordsPerRead = 2000
const WriteChannelCommandDepth = 1000000
NOTE: Access to the TransactionPipe structures is single threaded with the exception of the CommandChannel. Modification of the cache contents is performed by de-queueing commands from the CommandChannel in the single Cache thread.
Variables ¶
This section is empty.
Functions ¶
func AppendIntervalTicks ¶
func FinishAndWait ¶
func FinishAndWait()
FinishAndWait closes the writtenIndexes channel, and waits for the remaining triggers to fire, returning
func NewIOPlan ¶
func NewIOPlan(fl SortedFileList, pr *planner.ParseResult) (iop *ioplan, err error)
func NewInstanceSetup ¶
func NewReader ¶
func NewReader(pr *planner.ParseResult) (r *reader, err error)
func StartupCacheAndWAL ¶
func StartupCacheAndWAL(rootDir string) (tgc *TransactionPipe, wf *WALFileType, err error)
func WriteBufferToFile ¶
func WriteCSM ¶
func WriteCSM(csm io.ColumnSeriesMap, isVariableLength bool) (err error)
WriteCSM writs ColumnSeriesMap csm to each destination file, and flush it to the disk, isVariableLength is set to true if the record content is variable-length type. WriteCSM also verifies the DataShapeVector of the incoming ColumnSeriesMap matches the on-disk DataShapeVector defined by the file header. WriteCSM will create any files if they do not already exist for the given ColumnSeriesMap based on its TimeBucketKey.
Types ¶
type CacheEntryAlreadyOpenError ¶
type CacheEntryAlreadyOpenError string
WAL Messages
func (CacheEntryAlreadyOpenError) Error ¶
func (msg CacheEntryAlreadyOpenError) Error() string
type CacheImmutableError ¶
type CacheImmutableError string
func (CacheImmutableError) Error ¶
func (msg CacheImmutableError) Error() string
type CachedFP ¶
type CachedFP struct {
// contains filtered or unexported fields
}
func NewCachedFP ¶
func NewCachedFP() *CachedFP
type IndirectRecordInfo ¶
type IndirectRecordInfo struct {
Index, Offset, Len int64
}
type InstanceMetadata ¶
type InstanceMetadata struct { InstanceID int64 RootDir string CatalogDir *catalog.Directory TXNPipe *TransactionPipe WALFile *WALFileType WALWg sync.WaitGroup ShutdownPending bool WALBypass bool TriggerMatchers []*trigger.TriggerMatcher }
var ThisInstance *InstanceMetadata
type NotOpenError ¶
type NotOpenError string
func (NotOpenError) Error ¶
func (msg NotOpenError) Error() string
type RecordLengthNotConsistent ¶
type RecordLengthNotConsistent string
func (RecordLengthNotConsistent) Error ¶
func (msg RecordLengthNotConsistent) Error() string
type ReplayStateEnum ¶
type ReplayStateEnum int8
const ( Invalid2 ReplayStateEnum = iota NOTREPLAYED REPLAYED REPLAYINPROCESS )
type ShortReadError ¶
type ShortReadError string
func (ShortReadError) Error ¶
func (msg ShortReadError) Error() string
type SingleTargetRequiredForWriter ¶
type SingleTargetRequiredForWriter string
func (SingleTargetRequiredForWriter) Error ¶
func (msg SingleTargetRequiredForWriter) Error() string
type SortedFileList ¶
type SortedFileList []planner.QualifiedFile
func (SortedFileList) Len ¶
func (fl SortedFileList) Len() int
func (SortedFileList) Less ¶
func (fl SortedFileList) Less(i, j int) bool
func (SortedFileList) Swap ¶
func (fl SortedFileList) Swap(i, j int)
type TransactionPipe ¶
type TransactionPipe struct {
// contains filtered or unexported fields
}
TransactionPipe stores the contents of the current pending Transaction Group and writes it to WAL when flush() is called
func NewTransactionPipe ¶
func NewTransactionPipe() *TransactionPipe
NewTransactionPipe creates a new transaction pipe that channels all of the write transactions to the WAL and primary writers
func (*TransactionPipe) NewTGID ¶
func (tgc *TransactionPipe) NewTGID() int64
NewTGID monotonically increases the transaction group ID using the current unix epoch nanosecond timestamp
func (*TransactionPipe) TGID ¶
func (tgc *TransactionPipe) TGID() int64
TGID returns the latest transaction group ID
type TxnStatusEnum ¶
type TxnStatusEnum int8
--- Status
const ( PREPARING TxnStatusEnum = iota COMMITINTENDED COMMITCOMPLETE )
type WALCreateError ¶
type WALCreateError string
func (WALCreateError) Error ¶
func (msg WALCreateError) Error() string
type WALFileType ¶
type WALFileType struct { // These three fields plus the MID form the WAL Header, written at the beginning of the WAL File FileStatus FileStatusEnum ReplayState ReplayStateEnum OwningInstanceID int64 // End of WAL Header RootPath string // Path to the root directory, base of FileName FilePath string // WAL file full path FilePtr *os.File // Active file pointer to FileName // contains filtered or unexported fields }
func NewWALFile ¶
func NewWALFile(rootDir string, existingFilePath string) (wf *WALFileType, err error)
func (*WALFileType) CanDeleteSafely ¶
func (wf *WALFileType) CanDeleteSafely() bool
func (*WALFileType) CanWrite ¶
func (wf *WALFileType) CanWrite(msg string) bool
func (*WALFileType) Close ¶
func (wf *WALFileType) Close(ReplayStatus ReplayStateEnum)
func (*WALFileType) Delete ¶
func (wf *WALFileType) Delete() (err error)
func (*WALFileType) FullPathToWALKey ¶
func (wf *WALFileType) FullPathToWALKey(fullPath string) (keyPath string)
func (*WALFileType) IsOpen ¶
func (wf *WALFileType) IsOpen() bool
func (*WALFileType) NeedsReplay ¶
func (wf *WALFileType) NeedsReplay() bool
func (*WALFileType) Open ¶
func (wf *WALFileType) Open() error
func (*WALFileType) ReadStatus ¶
func (wf *WALFileType) ReadStatus() (fileStatus FileStatusEnum, replayStatus ReplayStateEnum, OwningInstanceID int64, err error)
func (*WALFileType) Replay ¶
func (wf *WALFileType) Replay(writeData bool) error
func (*WALFileType) RequestFlush ¶
func (wf *WALFileType) RequestFlush()
RequestFlush requests WAL Flush to the WAL writer goroutine if it exists, or just does the work in the same goroutine otherwise. The function blocks if there are no current queued flushes, and returns if there is already one queued which will handle the data present in the write channel, as it will flush as soon as possible.
func (*WALFileType) SyncWAL ¶
func (wf *WALFileType) SyncWAL(WALRefresh, PrimaryRefresh time.Duration, walRotateInterval int)
func (*WALFileType) WALKeyToFullPath ¶
func (wf *WALFileType) WALKeyToFullPath(keyPath string) (fullPath string)
func (*WALFileType) WriteStatus ¶
func (wf *WALFileType) WriteStatus(FileStatus FileStatusEnum, ReplayState ReplayStateEnum)
func (*WALFileType) WriteTransactionInfo ¶
func (wf *WALFileType) WriteTransactionInfo(tid int64, did DestEnum, txnStatus TxnStatusEnum)
type WALTakeOverError ¶
type WALTakeOverError string
func (WALTakeOverError) Error ¶
func (msg WALTakeOverError) Error() string
type WALWriteError ¶
type WALWriteError string
func (WALWriteError) Error ¶
func (msg WALWriteError) Error() string
type WriteCommand ¶
type WriteCommand struct { RecordType io.EnumRecordType WALKeyPath string Offset, Index int64 Data []byte }
type Writer ¶
type Writer struct {
// contains filtered or unexported fields
}
func NewWriter ¶
func NewWriter(tbi *io.TimeBucketInfo, tgc *TransactionPipe, rootCatDir *catalog.Directory) (*Writer, error)
func (*Writer) AddNewYearFile ¶
func (*Writer) WriteRecords ¶
WriteRecords creates a WriteCommand from the supplied timestamp and data buffer, and sends it over the write channel to be flushed to disk in the WAL sync subroutine. The caller should assume that by calling WriteRecords directly, the data will be written to the file regardless if it satisfies the on-disk data shape, possible corrupting the data files. It is recommended to call WriteCSM() for any writes as it is safer.
type WrongSizeError ¶
type WrongSizeError string
func (WrongSizeError) Error ¶
func (msg WrongSizeError) Error() string
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
package buffile helps batch write by writes to the tempobary in-memory buffer under the assumption that many writes come in to the part of single file frequently.
|
package buffile helps batch write by writes to the tempobary in-memory buffer under the assumption that many writes come in to the part of single file frequently. |