compute_pipes

package
v0.0.0-...-b183724 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 1, 2024 License: Apache-2.0 Imports: 31 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CreateOutputTable

func CreateOutputTable(dbpool *pgxpool.Pool, tableName pgx.Identifier, tableSpec *TableSpec) error

Create the Staging Table

func EvalHash

func EvalHash(key interface{}, partitions uint64) *uint64

func Hash

func Hash(key []byte, partitions uint64) uint64

func OverpunchNumber

func OverpunchNumber(value string, decimalPlaces int) (string, error)

func ParseDate

func ParseDate(date string) (*time.Time, error)

func ParseDatetime

func ParseDatetime(datetime string) (*time.Time, error)

func PrepareOutoutTable

func PrepareOutoutTable(dbpool *pgxpool.Pool, tableIdentifier pgx.Identifier, tableSpec *TableSpec) error

func SplitTableName

func SplitTableName(tableName string) (pgx.Identifier, error)

func StartComputePipes

func StartComputePipes(dbpool *pgxpool.Pool, headersDKInfo *schema.HeadersAndDomainKeysInfo, done chan struct{}, errCh chan error,
	computePipesInputCh <-chan []interface{}, chResults *ChannelResults,
	cpConfig *ComputePipesConfig, envSettings map[string]interface{},
	fileKeyComponents map[string]interface{})

Function to write transformed row to database

func ToBool

func ToBool(b interface{}) bool

func ToDouble

func ToDouble(d interface{}) (float64, error)

Types

type AggregateTransformationPipe

type AggregateTransformationPipe struct {
	// contains filtered or unexported fields
}

type BuilderContext

type BuilderContext struct {
	// contains filtered or unexported fields
}

func (*BuilderContext) FileKey

func (ctx *BuilderContext) FileKey() string

func (*BuilderContext) JetsPartition

func (ctx *BuilderContext) JetsPartition() string

func (*BuilderContext) NewAggregateTransformationPipe

func (ctx *BuilderContext) NewAggregateTransformationPipe(source *InputChannel, outputCh *OutputChannel, spec *TransformationSpec) (*AggregateTransformationPipe, error)

func (*BuilderContext) NewMapRecordTransformationPipe

func (ctx *BuilderContext) NewMapRecordTransformationPipe(source *InputChannel, outputCh *OutputChannel, spec *TransformationSpec) (*MapRecordTransformationPipe, error)

func (*BuilderContext) NewPartitionWriterTransformationPipe

func (ctx *BuilderContext) NewPartitionWriterTransformationPipe(source *InputChannel, jetsPartitionKey interface{},
	outputCh *OutputChannel, copy2DeviceResultCh chan ComputePipesResult, spec *TransformationSpec) (*PartitionWriterTransformationPipe, error)

Create a new jets_partition writer, the partition is identified by the jetsPartition

func (*BuilderContext) ReportMetrics

func (ctx *BuilderContext) ReportMetrics()

func (*BuilderContext) SessionId

func (ctx *BuilderContext) SessionId() string

func (*BuilderContext) StartClusterMap

func (ctx *BuilderContext) StartClusterMap(spec *PipeSpec, source *InputChannel, clusterMapResultCh chan chan ComputePipesResult)

Cluster nodes sharding data using splitter key

func (*BuilderContext) StartFanOutPipe

func (ctx *BuilderContext) StartFanOutPipe(spec *PipeSpec, source *InputChannel)

func (*BuilderContext) StartSplitterPipe

func (ctx *BuilderContext) StartSplitterPipe(spec *PipeSpec, source *InputChannel, writePartitionsResultCh chan chan ComputePipesResult)

type CaseExpression

type CaseExpression struct {
	When ExpressionNode `json:"when"`
	Then ExpressionNode `json:"then"`
}

type Channel

type Channel struct {
	// contains filtered or unexported fields
}

type ChannelRegistry

type ChannelRegistry struct {
	// contains filtered or unexported fields
}

func (*ChannelRegistry) AddDistributionChannel

func (r *ChannelRegistry) AddDistributionChannel(input string) string

func (*ChannelRegistry) CloseChannel

func (r *ChannelRegistry) CloseChannel(name string)

func (*ChannelRegistry) GetInputChannel

func (r *ChannelRegistry) GetInputChannel(name string) (*InputChannel, error)

func (*ChannelRegistry) GetOutputChannel

func (r *ChannelRegistry) GetOutputChannel(name string) (*OutputChannel, error)

type ChannelResults

type ChannelResults struct {
	LoadFromS3FilesResultCh chan LoadFromS3FilesResult
	Copy2DbResultCh         chan chan ComputePipesResult
	WritePartitionsResultCh chan chan chan ComputePipesResult
	MapOnClusterResultCh    chan chan chan ComputePipesResult
}

type ChannelSpec

type ChannelSpec struct {
	Name    string   `json:"name"`
	Columns []string `json:"columns"`
}

type ClusterSpec

type ClusterSpec struct {
	CpipesMode              string `json:"cpipes_mode"`
	ReadTimeout             int    `json:"read_timeout"`
	WriteTimeout            int    `json:"write_timeout"`
	PeerRegistrationTimeout int    `json:"peer_registration_timeout"`
	NbrNodes                int    `json:"nbr_nodes"`
	NbrSubClusters          int    `json:"nbr_sub_clusters"`
	NbrJetsPartitions       uint64 `json:"nbr_jets_partitions"`
	PeerBatchSize           int    `json:"peer_batch_size"`
	NodeId                  int    // calculated field
	SubClusterId            int    // calculated field
	NbrSubClusterNodes      int    // calculated field
	SubClusterNodeId        int    // calculated field
}

Config for peer2peer communication

type ComputePipesConfig

type ComputePipesConfig struct {
	MetricsConfig *MetricsSpec   `json:"metrics_config"`
	ClusterConfig *ClusterSpec   `json:"cluster_config"`
	OutputTables  []TableSpec    `json:"output_tables"`
	Channels      []ChannelSpec  `json:"channels"`
	Context       *[]ContextSpec `json:"context"`
	PipesConfig   []PipeSpec     `json:"pipes_config"`
}

This file contains the Compute Pipes configuration model

func UnmarshalComputePipesConfig

func UnmarshalComputePipesConfig(computePipesJson *string, nodeId, nbrNodes int) (*ComputePipesConfig, error)

type ComputePipesResult

type ComputePipesResult struct {
	// Table name can be jets_partition name
	// PartCount is nbr of file part in jets_partition
	TableName    string
	CopyRowCount int64
	PartsCount   int64
	Err          error
}

type ConcatFunctionArg

type ConcatFunctionArg struct {
	Delimit         string
	ColumnPositions []int
}

func ParseConcatFunctionArgument

func ParseConcatFunctionArgument(rawArg *string, functionName string, inputColumnName2Pos map[string]int, cache map[string]interface{}) (*ConcatFunctionArg, error)

type ContextSpec

type ContextSpec struct {
	// Type range: file_key_component
	Type string `json:"type"`
	Key  string `json:"key"`
	Expr string `json:"expr"`
}

type ExpressionNode

type ExpressionNode struct {
	// Type for leaf node: select, value, eval
	Type     *string         `json:"type"`
	Expr     *string         `json:"expr"`
	EvalExpr *ExpressionNode `json:"eval_expr"`
	Arg      *ExpressionNode `json:"arg"`
	Lhs      *ExpressionNode `json:"lhs"`
	Op       *string         `json:"op"`
	Rhs      *ExpressionNode `json:"rhs"`
}

type FindReplaceFunctionArg

type FindReplaceFunctionArg struct {
	Find        string
	ReplaceWith string
}

func ParseFindReplaceFunctionArgument

func ParseFindReplaceFunctionArgument(rawArg *string, functionName string, cache map[string]interface{}) (*FindReplaceFunctionArg, error)

type HashExpression

type HashExpression struct {
	Expr              string          `json:"expr"`
	Format            *string         `json:"format"`
	NbrJetsPartitions *uint64         `json:"nbr_jets_partitions"`
	DefaultExpr       *ExpressionNode `json:"default_expr"`
}

type Input2PipeSet

type Input2PipeSet map[string]*PipeSet

type InputChannel

type InputChannel struct {
	// contains filtered or unexported fields
}

type LoadFromS3FilesResult

type LoadFromS3FilesResult struct {
	LoadRowCount int64
	BadRowCount  int64
	Err          error
}

type MapExpression

type MapExpression struct {
	CleansingFunction *string `json:"cleansing_function"`
	Argument          *string `json:"argument"`
	Default           *string `json:"default"`
	ErrMsg            *string `json:"err_msg"`
	RdfType           string  `json:"rdf_type"`
}

type MapRecordTransformationPipe

type MapRecordTransformationPipe struct {
	// contains filtered or unexported fields
}

type Metric

type Metric struct {
	// Type range: runtime
	// Name values: alloc_mb, total_alloc_mb, sys_mb, nbr_gc
	// note: suffix _mb for units in MiB
	Type string `json:"type"`
	Name string `json:"name"`
}

type MetricsSpec

type MetricsSpec struct {
	ReportInterval int      `json:"report_interval_sec"`
	RuntimeMetrics []Metric `json:"runtime_metrics"`
}

type OutputChannel

type OutputChannel struct {
	// contains filtered or unexported fields
}

type Overpunch

type Overpunch struct {
	Sign  string
	Digit string
}

OverpunchNumber Overpunch allows representation of positive and negatives in a numeric field without having to expand the size of the field for a plus or minus sign. The overpunch character replaces the right-most character in a numeric field. In a six-digit field, 99.95 would be represented as 00999E. In the same field, a negative 99.95 would be represented as 00999N. Lack of an overpunch character implies a positive amount - in other words, a positive 99.95 could be sent on the file as 009995.

Number,Positive Overpunch,Negative Overpunch
0,     {,                 }
1,     A,                 J
2,     B,                 K
3,     C,                 L
4,     D,                 M
5,     E,                 N
6,     F,                 O
7,     G,                 P
8,     H,                 Q
9,     I,                 R

type PartitionWriterTransformationPipe

type PartitionWriterTransformationPipe struct {
	// contains filtered or unexported fields
}

type PathSubstitution

type PathSubstitution struct {
	Replace string `json:"replace"`
	With    string `json:"with"`
}

type Peer

type Peer struct {
	// contains filtered or unexported fields
}

type PeerRecordMessage

type PeerRecordMessage struct {
	Sender       int32
	RecordsCount int32
	Records      [][]interface{}
}

Message used to send records to remote peer Sender is subClusterNodeId of client peer

type PeerReply

type PeerReply struct{}

type PeerServer

type PeerServer struct {
	// contains filtered or unexported fields
}

The server handling incomming requests from peer nodes

func (*PeerServer) ClientDone

func (ps *PeerServer) ClientDone(args *PeerRecordMessage, reply *PeerReply) error

func (*PeerServer) ClientReady

func (ps *PeerServer) ClientReady(args *PeerRecordMessage, reply *PeerReply) error

func (*PeerServer) PushRecords

func (ps *PeerServer) PushRecords(args *PeerRecordMessage, reply *PeerReply) error

type PipeSet

type PipeSet map[*PipeSpec]bool

type PipeSpec

type PipeSpec struct {
	// Type range: fan_out, splitter, distribute_data
	Type   string               `json:"type"`
	Input  string               `json:"input"`
	Column *string              `json:"column"` // splitter column
	Apply  []TransformationSpec `json:"apply"`
}

type PipeTransformationEvaluator

type PipeTransformationEvaluator interface {
	// contains filtered or unexported methods
}

type S3DeviceWriter

type S3DeviceWriter struct {
	// contains filtered or unexported fields
}

func (*S3DeviceWriter) WritePartition

func (ctx *S3DeviceWriter) WritePartition(s3WriterResultCh chan<- ComputePipesResult)

type SaveResultsContext

type SaveResultsContext struct {
	JetsPartition string
	NodeId        int
	SessionId     string
	// contains filtered or unexported fields
}

func NewSaveResultsContext

func NewSaveResultsContext(dbpool *pgxpool.Pool) *SaveResultsContext

func (*SaveResultsContext) Save

func (ctx *SaveResultsContext) Save(category string, result *ComputePipesResult)

type SubStringFunctionArg

type SubStringFunctionArg struct {
	Start int
	End   int
}

func ParseSubStringFunctionArgument

func ParseSubStringFunctionArgument(rawArg *string, functionName string, cache map[string]interface{}) (*SubStringFunctionArg, error)

type TableColumnSpec

type TableColumnSpec struct {
	Name    string `json:"name"`
	RdfType string `json:"rdf_type"`
}

type TableSpec

type TableSpec struct {
	Key     string            `json:"key"`
	Name    string            `json:"name"`
	Columns []TableColumnSpec `json:"columns"`
}

type TransformationColumnEvaluator

type TransformationColumnEvaluator interface {
	// contains filtered or unexported methods
}

type TransformationColumnSpec

type TransformationColumnSpec struct {
	// Type range: select, value, eval, map, hash
	// (applicable to aggregate) count, distinct_count, sum, min,
	// case, map_reduce
	Name        string                      `json:"name"`
	Type        string                      `json:"type"`
	Expr        *string                     `json:"expr"`
	MapExpr     *MapExpression              `json:"map_expr"`
	EvalExpr    *ExpressionNode             `json:"eval_expr"`
	HashExpr    *HashExpression             `json:"hash_expr"`
	Where       *ExpressionNode             `json:"where"`
	CaseExpr    []CaseExpression            `json:"case_expr"`
	ElseExpr    *ExpressionNode             `json:"else_expr"`
	MapOn       *string                     `json:"map_on"`
	ApplyMap    *[]TransformationColumnSpec `json:"apply_map"`
	ApplyReduce *[]TransformationColumnSpec `json:"apply_reduce"`
}

type TransformationSpec

type TransformationSpec struct {
	// Type range: map_record, aggregate, partition_writer
	Type                  string                     `json:"type"`
	PartitionSize         *int                       `json:"partition_size"`
	FilePathSubstitutions *[]PathSubstitution        `json:"file_path_substitutions"`
	Columns               []TransformationColumnSpec `json:"columns"`
	Output                string                     `json:"output"`
}

type WriteTableSource

type WriteTableSource struct {
	// contains filtered or unexported fields
}

func (*WriteTableSource) Err

func (wt *WriteTableSource) Err() error

func (*WriteTableSource) Next

func (wt *WriteTableSource) Next() bool

pgx.CopyFromSource interface

func (*WriteTableSource) Values

func (wt *WriteTableSource) Values() ([]interface{}, error)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL