proc

package
v1.1.18 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 3, 2024 License: MIT Imports: 28 Imported by: 0

Documentation

Overview

Package proc contains classes and functions that process script nodes and write results to tables or files

Index

Constants

View Source
const DefaultFileInserterBatchCapacity int = 1000
View Source
const DefaultInserterBatchSize int = 5000
View Source
const HarvestForDeleteRowsetSize = 1000 // Do not let users tweak it, maybe too sensitive

Variables

View Source
var ErrDuplicateKey = errors.New("duplicate key")
View Source
var ErrDuplicateRowid = errors.New("duplicate rowid")

Functions

func CreateDataTableCql

func CreateDataTableCql(keyspace string, runId int16, tableCreator *sc.TableCreatorDef) string

func CreateIdxTableCql

func CreateIdxTableCql(keyspace string, runId int16, idxName string, idxDef *sc.IdxDef) string

func DeleteDataAndUniqueIndexesByBatchIdx

func DeleteDataAndUniqueIndexesByBatchIdx(logger *l.CapiLogger, pCtx *ctx.MessageProcessingContext) error

To test it, see comments in the end of RunCreateTableRelForBatch

Types

type BatchStats added in v1.1.1

type BatchStats struct {
	Src         string
	Dst         string
	RowsRead    int
	RowsWritten int
	Elapsed     time.Duration
}

func RunCreateDistinctTableForBatch added in v1.1.18

func RunCreateDistinctTableForBatch(envConfig *env.EnvConfig,
	logger *l.CapiLogger,
	pCtx *ctx.MessageProcessingContext,
	readerNodeRunId int16,
	startLeftToken int64,
	endLeftToken int64) (BatchStats, error)

func RunCreateFile

func RunCreateFile(envConfig *env.EnvConfig,
	logger *l.CapiLogger,
	pCtx *ctx.MessageProcessingContext,
	readerNodeRunId int16,
	startToken int64,
	endToken int64) (BatchStats, error)

func RunCreateTableForBatch

func RunCreateTableForBatch(envConfig *env.EnvConfig,
	logger *l.CapiLogger,
	pCtx *ctx.MessageProcessingContext,
	readerNodeRunId int16,
	startLeftToken int64,
	endLeftToken int64) (BatchStats, error)

func RunCreateTableForCustomProcessorForBatch

func RunCreateTableForCustomProcessorForBatch(envConfig *env.EnvConfig,
	logger *l.CapiLogger,
	pCtx *ctx.MessageProcessingContext,
	readerNodeRunId int16,
	startLeftToken int64,
	endLeftToken int64) (BatchStats, error)

func RunCreateTableRelForBatch

func RunCreateTableRelForBatch(envConfig *env.EnvConfig,
	logger *l.CapiLogger,
	pCtx *ctx.MessageProcessingContext,
	readerNodeRunId int16,
	lookupNodeRunId int16,
	startLeftToken int64,
	endLeftToken int64) (BatchStats, error)

func RunReadFileForBatch

func RunReadFileForBatch(envConfig *env.EnvConfig, logger *l.CapiLogger, pCtx *ctx.MessageProcessingContext, srcFileIdx int) (BatchStats, error)

func (*BatchStats) ToString added in v1.1.1

func (bs *BatchStats) ToString() string

type CustomProcessorRunner

type CustomProcessorRunner interface {
	Run(logger *l.CapiLogger, pCtx *ctx.MessageProcessingContext, rsIn *Rowset, flushVarsArray func(varsArray []*eval.VarValuesMap, varsArrayCount int) error) error
}

type DataIdxSeqModeType added in v1.1.18

type DataIdxSeqModeType int
const (
	DataIdxSeqModeDataFirst DataIdxSeqModeType = iota
	DataIdxSeqModeDistinctIdxFirst
)

type FileInserter

type FileInserter struct {
	PCtx                  *ctx.MessageProcessingContext
	FileCreator           *sc.FileCreatorDef
	CurrentBatch          *WriteFileBatch
	BatchCapacity         int
	BatchesIn             chan *WriteFileBatch
	RecordWrittenStatuses chan error
	BatchesSent           int
	FinalFileUrl          string
	TempFilePath          string
}

type FileRecordHeap

type FileRecordHeap []*FileRecordHeapItem

func (FileRecordHeap) Len

func (h FileRecordHeap) Len() int

func (FileRecordHeap) Less

func (h FileRecordHeap) Less(i, j int) bool

func (*FileRecordHeap) Pop

func (h *FileRecordHeap) Pop() any

func (*FileRecordHeap) Push

func (h *FileRecordHeap) Push(x any)

func (FileRecordHeap) Swap

func (h FileRecordHeap) Swap(i, j int)

type FileRecordHeapItem

type FileRecordHeapItem struct {
	FileRecord *[]any
	Key        string
}

type PreparedQuery added in v1.1.15

type PreparedQuery struct {
	Qb    *cql.QueryBuilder
	Query string
}

type Rowset

type Rowset struct {
	Fields                []sc.FieldRef
	FieldsByFullAliasName map[string]int
	FieldsByFieldName     map[string]int
	Rows                  []*[]any
	RowCount              int
}

func NewRowsetFromFieldRefs

func NewRowsetFromFieldRefs(fieldRefsList ...sc.FieldRefs) *Rowset

func (*Rowset) AppendFieldRefs

func (rs *Rowset) AppendFieldRefs(fieldRefs *sc.FieldRefs)

func (*Rowset) AppendFieldRefsWithFilter

func (rs *Rowset) AppendFieldRefsWithFilter(fieldRefs *sc.FieldRefs, tableFilter string)

func (*Rowset) ArrangeByRowid

func (rs *Rowset) ArrangeByRowid(rowids []int64) error

func (*Rowset) ExportToVars

func (rs *Rowset) ExportToVars(rowIdx int, vars *eval.VarValuesMap) error

func (*Rowset) ExportToVarsWithAlias

func (rs *Rowset) ExportToVarsWithAlias(rowIdx int, vars *eval.VarValuesMap, useTableAlias string) error

func (*Rowset) GetFieldNames

func (rs *Rowset) GetFieldNames() *[]string

func (*Rowset) GetTableRecord

func (rs *Rowset) GetTableRecord(rowIdx int) (map[string]any, error)

func (*Rowset) InitRows

func (rs *Rowset) InitRows(capacity int) error

func (*Rowset) ToString

func (rs *Rowset) ToString() string

type TableInserter

type TableInserter struct {
	PCtx                       *ctx.MessageProcessingContext
	TableCreator               *sc.TableCreatorDef
	RecordsIn                  chan WriteChannelItem // Channel to pass records from the main function like RunCreateTableForBatch, usig add(), to TableInserter
	RecordWrittenStatuses      chan error
	RecordWrittenStatusesMutex sync.Mutex
	MachineHash                int64
	NumWorkers                 int
	MinInserterRate            int
	WorkerWaitGroup            sync.WaitGroup
	RecordsSent                int // Records sent to RecordsIn
	RecordsProcessed           int // Number of items received in RecordWrittenStatuses
	DoesNotExistPause          float32
	OperationTimedOutPause     float32
	DataIdxSeqMode             DataIdxSeqModeType
}

type TableRecord

type TableRecord map[string]any

type TableRecordBatch

type TableRecordBatch []TableRecordPtr

type TableRecordPtr

type TableRecordPtr *map[string]any

type WriteChannelItem

type WriteChannelItem struct {
	TableRecord *TableRecord
	IndexKeyMap map[string]string
}

type WriteFileBatch

type WriteFileBatch struct {
	Rows     [][]any
	RowCount int
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL