Documentation
ΒΆ
Index ΒΆ
- Variables
- func SetBenchmarkLogger(logger Logger)
- func SetDebugLogger(logger Logger)
- func SetErrorLogger(logger Logger)
- func SetFatalLogger(logger Logger)
- func WithMessageCutConfluencePrefix(m []byte) []byte
- type Cardinality
- type CustomArrow
- type CustomField
- type DuckOption
- type DuckRunner
- func (d *DuckRunner) AddQueries(queries []string, exec bool)
- func (d *DuckRunner) Err() error
- func (d *DuckRunner) GetDB() *couac.Quacker
- func (d *DuckRunner) IsDeleteDBOnDone() bool
- func (d *DuckRunner) IsExec() bool
- func (d *DuckRunner) Path() string
- func (d *DuckRunner) Queries() []string
- func (d *DuckRunner) Run(ctx context.Context) error
- func (d *DuckRunner) SetDeleteOnDone(b bool)
- func (d *DuckRunner) SetErr(err error)
- func (d *DuckRunner) SetFunc(f func(*DuckRunner) error)
- func (d *DuckRunner) SetPath(p string)
- type FieldType
- type KafkaClientConf
- type Logger
- type Metrics
- type MetricsReport
- type Opt
- type Option
- func WithCustomArrows(p []CustomArrow) Option
- func WithCustomFields(p []CustomField) Option
- func WithDuckPathsChan(s int) Option
- func WithFileRotateThresholdDurationSeconds(p int) Option
- func WithFileRotateThresholdMB(p int64) Option
- func WithNormalizer(fields, aliases []string, failOnRangeError bool) Option
- func WithoutDuck() Option
- func WithoutDuckIngestRaw() Option
- func WithoutKafka() Option
- func WithoutProcessing() Option
- type Orchestrator
- func (o *Orchestrator[T]) ArrowQueueCapacity() int
- func (o *Orchestrator[T]) BenchmarksReport() string
- func (o *Orchestrator[T]) BenchmarksUnformatedReport() string
- func (o *Orchestrator[T]) Close()
- func (o *Orchestrator[T]) ConfigureDuck(opts ...DuckOption) error
- func (o *Orchestrator[T]) ConfigureProcessor(rChanCap, rowGroupSizeMultiplier, routineCount int, ...) error
- func (o *Orchestrator[T]) CurrentDBSize() int64
- func (o *Orchestrator[T]) DuckConnCount() int
- func (o *Orchestrator[T]) DuckIngest(ctx context.Context, w *sync.WaitGroup)
- func (o *Orchestrator[T]) DuckIngestWithRotate(ctx context.Context, w *sync.WaitGroup)
- func (o *Orchestrator[T]) DuckPaths() chan string
- func (o *Orchestrator[T]) Error() error
- func (o *Orchestrator[T]) IsClosed() bool
- func (o *Orchestrator[T]) KafkaClientCount() int
- func (o *Orchestrator[T]) KafkaQueueCapacity() int
- func (o *Orchestrator[T]) MessageChan() chan []byte
- func (o *Orchestrator[T]) MessageChanClose()
- func (o *Orchestrator[T]) MessageChanSend(m []byte)
- func (o *Orchestrator[T]) MockKafka(ctx context.Context, w *sync.WaitGroup, p T)
- func (o *Orchestrator[T]) MsgProcessorsCount() int
- func (o *Orchestrator[T]) NewKafkaConfig() *KafkaClientConf[T]
- func (o *Orchestrator[T]) NewMetrics()
- func (o *Orchestrator[T]) ProcessMessages(ctx context.Context, wg *sync.WaitGroup)
- func (o *Orchestrator[T]) RecordChan() chan Record
- func (o *Orchestrator[T]) RecordChanClose()
- func (o *Orchestrator[T]) RecordChanSend(r Record)
- func (o *Orchestrator[T]) Report() string
- func (o *Orchestrator[T]) ReportJSONL() string
- func (o *Orchestrator[T]) ResetMetrics()
- func (o *Orchestrator[T]) RestartDuck() error
- func (o *Orchestrator[T]) Run(ctx context.Context, wg *sync.WaitGroup)
- func (o *Orchestrator[T]) Schema() *bufa.Schema[T]
- func (o *Orchestrator[T]) StartMetrics()
- func (o *Orchestrator[T]) UpdateMetrics()
- type Record
- type TypedMetricsReport
Constants ΒΆ
This section is empty.
Variables ΒΆ
var ( ErrWaitGroupIsNil = errors.New("*sync.waitgroup is nil") ErrMessageChanCapacityZero = errors.New("chan []byte must have capacity > 0") ErrRecordChanCapacityZero = errors.New("chan []arrow.record must have capacity > 0") ErrProcessingFuncIsNil = errors.New("func([]byte, *bufa.Schema[T]) error is nil") ErrProcessingRoutineCountInvalid = errors.New("deserializer routine count not > 0") ErrRowGroupSizeMultiplier = errors.New("row group size multiplier not > 0") )
var (
ErrMissingDuckDBConfig = errors.New("missing duckdb configuration")
)
Functions ΒΆ
func SetBenchmarkLogger ΒΆ
func SetBenchmarkLogger(logger Logger)
func SetDebugLogger ΒΆ
func SetDebugLogger(logger Logger)
func SetErrorLogger ΒΆ
func SetErrorLogger(logger Logger)
func SetFatalLogger ΒΆ
func SetFatalLogger(logger Logger)
func WithMessageCutConfluencePrefix ΒΆ added in v0.5.1
WithMessageCutConfluencePrefix removes 6 bytes that Confluence producer adds for schema registry metadata.
Types ΒΆ
type Cardinality ΒΆ added in v0.5.10
type Cardinality protoreflect.Cardinality
Cardinality determines whether a field is optional, required, or repeated.
const ( Optional Cardinality = 1 // appears zero or one times Required Cardinality = 2 // appears exactly one time; invalid with Proto3 Repeated Cardinality = 3 // appears zero or more times )
Constants as defined by the google.protobuf.Cardinality enumeration.
func (*Cardinality) Get ΒΆ added in v0.5.10
func (c *Cardinality) Get() bufa.Cardinality
type CustomArrow ΒΆ added in v0.4.0
type CustomField ΒΆ added in v0.5.10
type CustomField struct { Name string Type FieldType FieldCardinality Cardinality IsPacked bool }
type DuckOption ΒΆ
type DuckOption func(duckConfig)
func WithDestinationTable ΒΆ
func WithDestinationTable(p string) DuckOption
func WithDriverPath ΒΆ
func WithDriverPath(p string) DuckOption
func WithDuckConnections ΒΆ
func WithDuckConnections(p int) DuckOption
func WithDuckRunner ΒΆ added in v0.5.1
func WithDuckRunner(p *DuckRunner) DuckOption
Run queries in series after writes to db file are completed before rotating to next db file. Usage: exec := new(q.DuckRunner) exec.AddQueries(queries, false) exec.SetFunc(RunAggs) exec.SetDeleteOnDone(true) err = o.ConfigureDuck(q.WithPathPrefix("reqlog"), q.WithDriverPath(driverPath), q.WithDestinationTable("req"), q.WithDuckConnections(24), q.WithDuckRunner(exec))
func WithPath ΒΆ
func WithPath(p string) DuckOption
func WithPathPrefix ΒΆ
func WithPathPrefix(p string) DuckOption
type DuckRunner ΒΆ added in v0.5.1
type DuckRunner struct {
// contains filtered or unexported fields
}
func (*DuckRunner) AddQueries ΒΆ added in v0.5.1
func (d *DuckRunner) AddQueries(queries []string, exec bool)
AddQueries adds queries to a DuckRunner and sets whether the runner should use RunExec instead of RunFunc to run the queries (RunExec should be used when no results from queries are expected).
func (*DuckRunner) Err ΒΆ added in v0.5.1
func (d *DuckRunner) Err() error
func (*DuckRunner) GetDB ΒΆ added in v0.5.1
func (d *DuckRunner) GetDB() *couac.Quacker
func (*DuckRunner) IsDeleteDBOnDone ΒΆ added in v0.5.1
func (d *DuckRunner) IsDeleteDBOnDone() bool
func (*DuckRunner) IsExec ΒΆ added in v0.5.1
func (d *DuckRunner) IsExec() bool
func (*DuckRunner) Path ΒΆ added in v0.5.1
func (d *DuckRunner) Path() string
func (*DuckRunner) Queries ΒΆ added in v0.5.1
func (d *DuckRunner) Queries() []string
func (*DuckRunner) Run ΒΆ added in v0.5.1
func (d *DuckRunner) Run(ctx context.Context) error
Run runs the defined queries, if exec is set to true does not expect queries to return any results otherwise will use query function to coordinate queries. Exec set to true is meant for running queries that aggregate to another table as well as EXPORT/COPY TO statements.
func (*DuckRunner) SetDeleteOnDone ΒΆ added in v0.5.1
func (d *DuckRunner) SetDeleteOnDone(b bool)
func (*DuckRunner) SetErr ΒΆ added in v0.5.1
func (d *DuckRunner) SetErr(err error)
func (*DuckRunner) SetFunc ΒΆ added in v0.5.1
func (d *DuckRunner) SetFunc(f func(*DuckRunner) error)
func (*DuckRunner) SetPath ΒΆ added in v0.5.1
func (d *DuckRunner) SetPath(p string)
type KafkaClientConf ΒΆ
type KafkaClientConf[T proto.Message] struct { // franz-go/pkg/kgo.Opt configurations. If any are set, these will override all // of the subsequently listed client settings. ClientConf []kgo.Opt // Number of Kafka clients to open. Default value is 1. If using more than one // use of a consumer group is recommended. ClientCount atomic.Int32 // Consumer group ConsumerGroup string // Instance prefix InstancePrefix string // Message channel capacity, must be greater than 0. Default capacity is 122880. MsgChanCap int // MsgTimeAppend sets whether to append the Kafka message timestamp as an 8 byte uint64 // at end of message bytes. It is the deserializing's function responsibility // to truncate these prior to reading the protobuf message. // Use `time.Milli(int64(binary.LittleEndian.Uint64(b)))` to retrieve the timestamp. MsgTimeAppend bool // Function to munge message bytes prior to deserialization. // As an example, Confluent java client adds 6 magic bytes at // beginning of message data for use with Schema Registry which must // be removed from the message prior to deserialization. Munger func([]byte) []byte // Kafka TLS dialer TlsDialer *tls.Dialer // Kafka topic to consume Topic string // Kafka seed brokers Seeds []string // SASL Auth User User string // SASL Auth password Password string // contains filtered or unexported fields }
type MetricsReport ΒΆ
type MetricsReport struct { NumCPU int `json:"num_cpu"` RuntimeOS string `json:"runtime_os"` KafkaClientCount int `json:"kafka_clients"` KafkaQueueCap int `json:"kafka_queue_cap"` ProcessorCount int `json:"processor_routines"` ArrowQueueCap int `json:"arrow_queue_cap"` DuckDBThresholdMB int `json:"duckdb_threshold_mb"` DuckConnCount int `json:"duckdb_connections,omitzero"` CustomArrows *int `json:"custom_arrows,omitempty"` NormalizerFields *int `json:"normalizer_fields,omitempty"` StartTime string `json:"start_time"` EndTime string `json:"end_time,omitzero"` Records string `json:"records"` NormRecords string `json:"norm_records"` DataTransferred string `json:"data_transferred"` Duration string `json:"duration"` RecordsPerSec string `json:"records_per_second"` TotalRecordsPerSec string `json:"total_rows_per_second"` TransferRate string `json:"transfer_rate"` OutputFiles int64 `json:"duckdb_files"` OutputFilesMB int64 `json:"duckdb_files_MB"` AvgDurationPerFile string `json:"file_avg_duration,omitempty"` }
type Option ΒΆ
type Option func(config)
func WithCustomArrows ΒΆ added in v0.4.0
func WithCustomArrows(p []CustomArrow) Option
func WithCustomFields ΒΆ added in v0.5.10
func WithCustomFields(p []CustomField) Option
func WithDuckPathsChan ΒΆ added in v0.5.2
func WithFileRotateThresholdDurationSeconds ΒΆ added in v0.5.12
WithFileRotateThresholdDurationSeconds sets the database file rotation duration threshold in seconds. Minimum duration is 60 seconds
func WithFileRotateThresholdMB ΒΆ
WithFileRotateThresholdMB sets the database file rotation size. Minimum rotation threshold is 100MB.
func WithNormalizer ΒΆ added in v0.5.0
WithNormalizer configures the scalars to add to a flat Arrow Record suitable for efficient aggregation. Protobuf data with nested messages converted to Arrow records is not only slower to insert into duckdb, running aggregation queries on nested data is much slower(by orders of magnitude). Fields should be specified by their path (field names separated by a period ie. 'field1.field2.field3'). The Arrow field types of the selected fields will be used to build the new schema. If coaslescing data between multiple fields of the same type, specify only one of the paths. List fields should have an index to retrieve specified, otherwise defaults to all elements; ranges are not yet implemented.
func WithoutDuck ΒΆ
func WithoutDuck() Option
func WithoutDuckIngestRaw ΒΆ added in v0.5.1
func WithoutDuckIngestRaw() Option
func WithoutKafka ΒΆ
func WithoutKafka() Option
func WithoutProcessing ΒΆ
func WithoutProcessing() Option
type Orchestrator ΒΆ
type Orchestrator[T proto.Message] struct { Metrics *Metrics // contains filtered or unexported fields }
func NewOrchestrator ΒΆ
func NewOrchestrator[T proto.Message](opts ...Option) (*Orchestrator[T], error)
func (*Orchestrator[T]) ArrowQueueCapacity ΒΆ added in v0.5.0
func (o *Orchestrator[T]) ArrowQueueCapacity() int
func (*Orchestrator[T]) BenchmarksReport ΒΆ
func (o *Orchestrator[T]) BenchmarksReport() string
BenchmarksReport generates an indented live benchmark report with throughput rates sampled from the last 30 seconds.
func (*Orchestrator[T]) BenchmarksUnformatedReport ΒΆ added in v0.5.7
func (o *Orchestrator[T]) BenchmarksUnformatedReport() string
BenchmarksUnformatedReport generates an unindented live benchmark report with throughput rates sampled from the last 30 seconds.
func (*Orchestrator[T]) Close ΒΆ
func (o *Orchestrator[T]) Close()
Close closes the DuckDB database, if open.
func (*Orchestrator[T]) ConfigureDuck ΒΆ
func (o *Orchestrator[T]) ConfigureDuck(opts ...DuckOption) error
ConfigureDuck initializes DuckDB connection settings.
func (*Orchestrator[T]) ConfigureProcessor ΒΆ
func (*Orchestrator[T]) CurrentDBSize ΒΆ added in v0.4.0
func (o *Orchestrator[T]) CurrentDBSize() int64
func (*Orchestrator[T]) DuckConnCount ΒΆ
func (o *Orchestrator[T]) DuckConnCount() int
func (*Orchestrator[T]) DuckIngest ΒΆ
func (o *Orchestrator[T]) DuckIngest(ctx context.Context, w *sync.WaitGroup)
DuckIngest reads records from the record channel and ingests the Records into a single DuckDB database. Ingestion ends when the Record channel is closed and empty.
func (*Orchestrator[T]) DuckIngestWithRotate ΒΆ
func (o *Orchestrator[T]) DuckIngestWithRotate(ctx context.Context, w *sync.WaitGroup)
DuckIngestWithRotate reads records from the record channel and ingests the Records into rotating DuckDB files, rotating at the file size threshold defined. Ingestion ends when the Record channel is closed and empty.
func (*Orchestrator[T]) DuckPaths ΒΆ
func (o *Orchestrator[T]) DuckPaths() chan string
func (*Orchestrator[T]) Error ΒΆ
func (o *Orchestrator[T]) Error() error
func (*Orchestrator[T]) IsClosed ΒΆ added in v0.5.8
func (o *Orchestrator[T]) IsClosed() bool
IsClosed returns whether DuckDB database is open or not.
func (*Orchestrator[T]) KafkaClientCount ΒΆ
func (o *Orchestrator[T]) KafkaClientCount() int
func (*Orchestrator[T]) KafkaQueueCapacity ΒΆ added in v0.5.0
func (o *Orchestrator[T]) KafkaQueueCapacity() int
func (*Orchestrator[T]) MessageChan ΒΆ
func (o *Orchestrator[T]) MessageChan() chan []byte
func (*Orchestrator[T]) MessageChanClose ΒΆ added in v0.5.8
func (o *Orchestrator[T]) MessageChanClose()
func (*Orchestrator[T]) MessageChanSend ΒΆ added in v0.5.8
func (o *Orchestrator[T]) MessageChanSend(m []byte)
func (*Orchestrator[T]) MockKafka ΒΆ added in v0.3.0
func (o *Orchestrator[T]) MockKafka(ctx context.Context, w *sync.WaitGroup, p T)
MockKafka produces protobuf messages with random data in each field to the message channel. An initialized message with at least one field containing data must be passed as an argument for generation to work. Usage: wg.Add(1) go o.MockKafka(ctxT, &wg, &gen.RequestEvent{Id: "1233242423243"})
func (*Orchestrator[T]) MsgProcessorsCount ΒΆ
func (o *Orchestrator[T]) MsgProcessorsCount() int
func (*Orchestrator[T]) NewKafkaConfig ΒΆ
func (o *Orchestrator[T]) NewKafkaConfig() *KafkaClientConf[T]
func (*Orchestrator[T]) NewMetrics ΒΆ
func (o *Orchestrator[T]) NewMetrics()
func (*Orchestrator[T]) ProcessMessages ΒΆ
func (o *Orchestrator[T]) ProcessMessages(ctx context.Context, wg *sync.WaitGroup)
ProcessMessages creates a pool of deserializer goroutines
func (*Orchestrator[T]) RecordChan ΒΆ
func (o *Orchestrator[T]) RecordChan() chan Record
func (*Orchestrator[T]) RecordChanClose ΒΆ added in v0.5.8
func (o *Orchestrator[T]) RecordChanClose()
func (*Orchestrator[T]) RecordChanSend ΒΆ added in v0.5.8
func (o *Orchestrator[T]) RecordChanSend(r Record)
func (*Orchestrator[T]) Report ΒΆ
func (o *Orchestrator[T]) Report() string
Report generates a summary of the collected Metrics
func (*Orchestrator[T]) ReportJSONL ΒΆ
func (o *Orchestrator[T]) ReportJSONL() string
Report generates a summary of the collected Metrics
func (*Orchestrator[T]) ResetMetrics ΒΆ added in v0.5.0
func (o *Orchestrator[T]) ResetMetrics()
ResetMetrics resets all metrics to zero.
func (*Orchestrator[T]) RestartDuck ΒΆ added in v0.5.8
func (o *Orchestrator[T]) RestartDuck() error
Reopen DuckDB database using an existing DuckDB configuration. Returns an error if ConfigureDuck has not been previously run or if Orchestrator has not been closed.
func (*Orchestrator[T]) Run ΒΆ
func (o *Orchestrator[T]) Run(ctx context.Context, wg *sync.WaitGroup)
func (*Orchestrator[T]) Schema ΒΆ
func (o *Orchestrator[T]) Schema() *bufa.Schema[T]
func (*Orchestrator[T]) StartMetrics ΒΆ
func (o *Orchestrator[T]) StartMetrics()
func (*Orchestrator[T]) UpdateMetrics ΒΆ
func (o *Orchestrator[T]) UpdateMetrics()
UpdateMetrics calculates the total duration, throughput, and throughput in bytes.
type TypedMetricsReport ΒΆ
type TypedMetricsReport struct { NumCPU int `json:"num_cpu"` RuntimeOS string `json:"runtime_os"` KafkaClientCount int `json:"kafka_clients"` KafkaQueueCap int `json:"kafka_queue_cap"` ProcessorCount int `json:"processor_routines"` ArrowQueueCap int `json:"arrow_queue_cap"` DuckDBThresholdMB int `json:"duckdb_threshold_mb"` DuckConnCount int `json:"duckdb_connections,omitzero"` CustomArrows *int `json:"custom_arrows,omitempty"` NormalizerFields *int `json:"normalizer_fields,omitempty"` StartTime string `json:"start_time"` EndTime string `json:"end_time"` Records int64 `json:"records"` NormRecords int64 `json:"norm_records"` DataTransferred int64 `json:"bytes_transferred"` Duration int64 `json:"duration_nano"` RecordsPerSec float64 `json:"records_per_second"` TotalRecordsPerSec float64 `json:"total_rows_per_second"` TransferRate string `json:"transfer_rate"` OutputFiles int64 `json:"duckdb_files"` OutputFilesMB int64 `json:"duckdb_files_MB"` AvgDurationPerFile float64 `json:"file_avg_duration,omitzero"` }