quacfka

package module
v0.5.14 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 14, 2025 License: Apache-2.0 Imports: 28 Imported by: 1

README ΒΆ

Quacfka πŸΉπŸ¦†

Go Reference

Go library to stream Kafka protobuf messages to DuckDB. Uses generics. Use your protobuf message as a type parameter to autogenerate an Arrow schema, provide a protobuf unmarshaling func, and stream data into DuckDB with a very high throughput.

Features

Arrow schema generation from a protobuf message type parameter
  • Converts a proto.Message into an Apache Arrow schema
    • Supports nested types
Configurable loggers
  • Set Debug, Error, and benchmark loggers

πŸš€ Install

Using Quacfka is easy. First, use go get to install the latest version of the library.

go get -u github.com/loicalleyne/quacfka@latest

πŸ’‘ Usage

You can import quacfka using:

import "github.com/loicalleyne/quacfka"

Create a new Orchestrator, configure the Kafka client, processing and DuckDB, then Run(). Kafka client can be configured with a slice of franz-go/pkg/kgo.Opt or SASL user/pass auth.

	import q "github.com/loicalleyne/quacfka"
	/* 
	Options:
	- create with DuckDB file rotation
	- create with a CustomArrow function to munge the Arrow record in-flight and insert it to another DuckDB table
	- create with a Normalizer to create a normalized Arrow Record you can write to in your proto decode function
	- run without Kafka
	- run without protobuf message decoding
	- run without duckdb
	 */
	normFields := []string{"id", "site.id", "timestamp.seconds", "stores[0].gallery.deals.id"}
	normAliases := []string{"id", "site", "event_time", "deal"}
    o, err := q.NewOrchestrator[*your.CustomProtoMessageType](q.WithFileRotateThresholdMB(5000), q.WithCustomArrows([]q.CustomArrow{{CustomFunc: flattenNestedForAgg, DestinationTable: "test"}}),q.WithNormalizer(normFields, normAliases, false))
	if err != nil {
		panic(err)
	}
	defer o.Close()
	q.SetDebugLogger(log.Printf)
	q.SetErrorLogger(log.Printf)
	q.SetFatalLogger(log.Fatalf)
	q.SetBenchmarkLogger(log.Printf)
    k := o.NewKafkaConfig()
	k.ClientCount.Store(int32(*kafkaRoutines))
	k.MsgChanCap = 122880 * 5
	k.ConsumerGroup = os.Getenv("CONSUMER_GROUP")
	k.Seeds = append(k.Seeds, os.Getenv("KAFKA_SEED"))
	k.User = os.Getenv("KAFKA_USER")
	k.Password = os.Getenv("KAFKA_PW")
	k.Munger = messageMunger
	k.Topic = "kafka.topic01"
    // Tune record channel capacity, row group size, number of processing routines, set custom unmarshal func
	err = o.ConfigureProcessor(*duckRoutines*3, 1, *routines, customProtoUnmarshal)
	if err != nil {
		log.Println(err)
		panic(err)
	}
	var driverPath string
	switch runtime.GOOS {
	case "darwin":
		driverPath = "/usr/local/lib/libduckdb.so.dylib"
	case "linux":
		driverPath = "/usr/local/lib/libduckdb.so"
	case "windows":
		h, _ := os.UserHomeDir()
		driverPath = h + "\\Downloads\\libduckdb-windows-amd64\\duckdb.dll"
	default:
	}
	err = o.ConfigureDuck(q.WithPathPrefix("duck"), q.WithDriverPath(driverPath), q.WithDestinationTable("mytable"), q.WithDuckConnections(*duckRoutines))
	if err != nil {
		panic(err)
	}
	// Use MockKafka to generate random data for your custom proto to simulate consuming the protobuf from Kafka
	// wg.Add(1)
	// go o.MockKafka(ctxT, &wg, &rr.BidRequestEvent{Id: "1233242423243"})
	wg.Add(1)
	go o.Run(ctxT, &wg)
	// Get chan string of closed, rotated DuckDB files
	duckFiles := o.DuckPaths()
	...
	// Query duckdb files to aggregate, activate alerts, etc...
	...
	wg.Wait()
	// Check for processing errors
	if o.Error() != nil {
		log.Println(err)
	}
	// Print pipeline metrics
	log.Printf("%v\n", o.Report())
...
func customProtoUnmarshal(m []byte, s any) error {
	newMessage := rr.BidRequestEventFromVTPool()
	err := newMessage.UnmarshalVTUnsafe(m)
	if err != nil {
		return err
	}
	// Assert s to `*bufarrow.Schema[*your.CustomProtoMessageType]`
	// Populate the Normalizer Arrow Record with flattened data
	rb := s.(*bufarrow.Schema[*rr.BidRequestEvent]).NormalizerBuilder()
	if rb != nil {
		b := rb.Fields()
		if b != nil {
			id := newMessage.GetId()
			site := newMessage.GetSite().GetId()
			timestampSeconds := newMessage.GetTimestamp().GetSeconds()
			if len(newMessage.GetStores()[0].GetGallery().GetDeals()) == 0 {
				b[0].(*array.StringBuilder).Append(id)
				b[1].(*array.StringBuilder).Append(site)
				b[2].(*array.Int64Builder).Append(timestampSeconds)
				b[3].(*array.StringBuilder).AppendNull()
			}
			for i := 0; i < len(newMessage.GetImp()[0].GetPmp().GetDeals()); i++ {
				b[0].(*array.StringBuilder).Append(id)
				b[1].(*array.StringBuilder).Append(site)
				b[2].(*array.Int64Builder).Append(timestampSeconds)
				b[3].(*array.StringBuilder).Append(newMessage.GetImp()[0].GetPmp().GetDeals()[i].GetId())
			}
		}

	// Assert s to `*bufarrow.Schema[*your.CustomProtoMessageType]`
	s.(*bufarrow.Schema[*your.CustomProtoMessageType]).Append(newMessage)
	newMessage.ReturnToVTPool()
	return nil
}

// Custom protobuf wire format bytes munger
// Confluent Java client adds magic bytes at beginning of message which will cause
// protobuf decoding to fail if not removed
func messageMunger(m []byte) []byte {
	return m[6:]
}

// Custom Arrow function to build a new Arrow Record from the main processing output Record 
func flattenNestedForAgg(ctx context.Context, dest string, record arrow.Record) arrow.Record {
	...
	return mungedRecord
}
// {
//   "num_cpu": 60,
//   "runtime_os": "linux",
//   "kafka_clients": 5,
//   "kafka_queue_cap": 983040,
//   "processor_routines": 32,
//   "arrow_queue_cap": 4,
//   "duckdb_threshold_mb": 4200,
//   "duckdb_connections": 24,
//   "normalizer_fields": 10,
//   "start_time": "2025-02-24T21:06:23Z",
//   "end_time": "2025-02-24T21:11:23Z",
//   "records": "123_686_901.00",
//   "norm_records": "122_212_452.00",
//   "data_transferred": "146.53 GB",
//   "duration": "4m59.585s",
//   "records_per_second": "398_271.90",
//   "total_rows_per_second": "806_210.41",
//   "transfer_rate": "500.86 MB/second",
//   "duckdb_files": 9,
//   "duckdb_files_MB": 38429,
//   "file_avg_duration": "33.579s"
// }

Generate random data to emulate the Kafka topic

	wg.Add(1)
	// Instantiate a sample proto.Message to provide a description,
	// random data will be generated for all fields.
	go o.MockKafka(ctxT, &wg, &your.CustomProtoMessageType{Id: "1233242423243"})
	wg.Add(1)
	// WithFileRotateThresholdMB specifies a file rotation threshold target in MB (not very accurate yet)
	go o.Run(ctxT, &wg, q.WithoutKafka(), q.WithFileRotateThresholdMB(250))
	wg.Wait()

πŸ’« Show your support

Give a ⭐️ if this project helped you! Feedback and PRs welcome.

Licence

Quacfka is released under the Apache 2.0 license. See LICENCE

Documentation ΒΆ

Index ΒΆ

Constants ΒΆ

This section is empty.

Variables ΒΆ

View Source
var (
	ErrWaitGroupIsNil                = errors.New("*sync.waitgroup is nil")
	ErrMessageChanCapacityZero       = errors.New("chan []byte must have capacity > 0")
	ErrRecordChanCapacityZero        = errors.New("chan []arrow.record must have capacity > 0")
	ErrProcessingFuncIsNil           = errors.New("func([]byte, *bufa.Schema[T]) error is nil")
	ErrProcessingRoutineCountInvalid = errors.New("deserializer routine count not > 0")
	ErrRowGroupSizeMultiplier        = errors.New("row group size multiplier not > 0")
)
View Source
var (
	ErrMissingDuckDBConfig = errors.New("missing duckdb configuration")
)

Functions ΒΆ

func SetBenchmarkLogger ΒΆ

func SetBenchmarkLogger(logger Logger)

func SetDebugLogger ΒΆ

func SetDebugLogger(logger Logger)

func SetErrorLogger ΒΆ

func SetErrorLogger(logger Logger)

func SetFatalLogger ΒΆ

func SetFatalLogger(logger Logger)

func WithMessageCutConfluencePrefix ΒΆ added in v0.5.1

func WithMessageCutConfluencePrefix(m []byte) []byte

WithMessageCutConfluencePrefix removes 6 bytes that Confluence producer adds for schema registry metadata.

Types ΒΆ

type Cardinality ΒΆ added in v0.5.10

type Cardinality protoreflect.Cardinality

Cardinality determines whether a field is optional, required, or repeated.

const (
	Optional Cardinality = 1 // appears zero or one times
	Required Cardinality = 2 // appears exactly one time; invalid with Proto3
	Repeated Cardinality = 3 // appears zero or more times
)

Constants as defined by the google.protobuf.Cardinality enumeration.

func (*Cardinality) Get ΒΆ added in v0.5.10

func (c *Cardinality) Get() bufa.Cardinality

type CustomArrow ΒΆ added in v0.4.0

type CustomArrow struct {
	CustomFunc       func(context.Context, string, arrow.Record) arrow.Record
	DestinationTable string
}

type CustomField ΒΆ added in v0.5.10

type CustomField struct {
	Name             string
	Type             FieldType
	FieldCardinality Cardinality
	IsPacked         bool
}

type DuckOption ΒΆ

type DuckOption func(duckConfig)

func WithDestinationTable ΒΆ

func WithDestinationTable(p string) DuckOption

func WithDriverPath ΒΆ

func WithDriverPath(p string) DuckOption

func WithDuckConnections ΒΆ

func WithDuckConnections(p int) DuckOption

func WithDuckRunner ΒΆ added in v0.5.1

func WithDuckRunner(p *DuckRunner) DuckOption

Run queries in series after writes to db file are completed before rotating to next db file. Usage: exec := new(q.DuckRunner) exec.AddQueries(queries, false) exec.SetFunc(RunAggs) exec.SetDeleteOnDone(true) err = o.ConfigureDuck(q.WithPathPrefix("reqlog"), q.WithDriverPath(driverPath), q.WithDestinationTable("req"), q.WithDuckConnections(24), q.WithDuckRunner(exec))

func WithPath ΒΆ

func WithPath(p string) DuckOption

func WithPathPrefix ΒΆ

func WithPathPrefix(p string) DuckOption

type DuckRunner ΒΆ added in v0.5.1

type DuckRunner struct {
	// contains filtered or unexported fields
}

func (*DuckRunner) AddQueries ΒΆ added in v0.5.1

func (d *DuckRunner) AddQueries(queries []string, exec bool)

AddQueries adds queries to a DuckRunner and sets whether the runner should use RunExec instead of RunFunc to run the queries (RunExec should be used when no results from queries are expected).

func (*DuckRunner) Err ΒΆ added in v0.5.1

func (d *DuckRunner) Err() error

func (*DuckRunner) GetDB ΒΆ added in v0.5.1

func (d *DuckRunner) GetDB() *couac.Quacker

func (*DuckRunner) IsDeleteDBOnDone ΒΆ added in v0.5.1

func (d *DuckRunner) IsDeleteDBOnDone() bool

func (*DuckRunner) IsExec ΒΆ added in v0.5.1

func (d *DuckRunner) IsExec() bool

func (*DuckRunner) Path ΒΆ added in v0.5.1

func (d *DuckRunner) Path() string

func (*DuckRunner) Queries ΒΆ added in v0.5.1

func (d *DuckRunner) Queries() []string

func (*DuckRunner) Run ΒΆ added in v0.5.1

func (d *DuckRunner) Run(ctx context.Context) error

Run runs the defined queries, if exec is set to true does not expect queries to return any results otherwise will use query function to coordinate queries. Exec set to true is meant for running queries that aggregate to another table as well as EXPORT/COPY TO statements.

func (*DuckRunner) SetDeleteOnDone ΒΆ added in v0.5.1

func (d *DuckRunner) SetDeleteOnDone(b bool)

func (*DuckRunner) SetErr ΒΆ added in v0.5.1

func (d *DuckRunner) SetErr(err error)

func (*DuckRunner) SetFunc ΒΆ added in v0.5.1

func (d *DuckRunner) SetFunc(f func(*DuckRunner) error)

func (*DuckRunner) SetPath ΒΆ added in v0.5.1

func (d *DuckRunner) SetPath(p string)

type FieldType ΒΆ added in v0.5.10

type FieldType fieldType
const (
	BOOL    FieldType = "bool"
	BYTES   FieldType = "[]byte"
	STRING  FieldType = "string"
	INT64   FieldType = "int64"
	FLOAT64 FieldType = "float64"
)

func (*FieldType) Get ΒΆ added in v0.5.10

func (t *FieldType) Get() bufa.FieldType

type KafkaClientConf ΒΆ

type KafkaClientConf[T proto.Message] struct {

	// franz-go/pkg/kgo.Opt configurations. If any are set, these will override all
	//  of the subsequently listed client settings.
	ClientConf []kgo.Opt
	// Number of Kafka clients to open. Default value is 1. If using more than one
	// use of a consumer group is recommended.
	ClientCount atomic.Int32
	// Consumer group
	ConsumerGroup string
	// Instance prefix
	InstancePrefix string
	// Message channel capacity, must be greater than 0. Default capacity is 122880.
	MsgChanCap int
	// MsgTimeAppend sets whether to append the Kafka message timestamp as an 8 byte uint64
	// at end of message bytes. It is the deserializing's function responsibility
	// to truncate these prior to reading the protobuf message.
	// Use `time.Milli(int64(binary.LittleEndian.Uint64(b)))` to retrieve the timestamp.
	MsgTimeAppend bool
	// Function to munge message bytes prior to deserialization.
	// As an example, Confluent java client adds 6 magic bytes at
	// beginning of message data for use with Schema Registry which must
	// be removed from the message prior to deserialization.
	Munger func([]byte) []byte
	// Kafka TLS dialer
	TlsDialer *tls.Dialer
	// Kafka topic to consume
	Topic string
	// Kafka seed brokers
	Seeds []string
	// SASL Auth User
	User string
	// SASL Auth password
	Password string
	// contains filtered or unexported fields
}

type Logger ΒΆ

type Logger func(string, ...any)

type Metrics ΒΆ

type Metrics struct {
	// contains filtered or unexported fields
}

type MetricsReport ΒΆ

type MetricsReport struct {
	NumCPU             int    `json:"num_cpu"`
	RuntimeOS          string `json:"runtime_os"`
	KafkaClientCount   int    `json:"kafka_clients"`
	KafkaQueueCap      int    `json:"kafka_queue_cap"`
	ProcessorCount     int    `json:"processor_routines"`
	ArrowQueueCap      int    `json:"arrow_queue_cap"`
	DuckDBThresholdMB  int    `json:"duckdb_threshold_mb"`
	DuckConnCount      int    `json:"duckdb_connections,omitzero"`
	CustomArrows       *int   `json:"custom_arrows,omitempty"`
	NormalizerFields   *int   `json:"normalizer_fields,omitempty"`
	StartTime          string `json:"start_time"`
	EndTime            string `json:"end_time,omitzero"`
	Records            string `json:"records"`
	NormRecords        string `json:"norm_records"`
	DataTransferred    string `json:"data_transferred"`
	Duration           string `json:"duration"`
	RecordsPerSec      string `json:"records_per_second"`
	TotalRecordsPerSec string `json:"total_rows_per_second"`
	TransferRate       string `json:"transfer_rate"`
	OutputFiles        int64  `json:"duckdb_files"`
	OutputFilesMB      int64  `json:"duckdb_files_MB"`
	AvgDurationPerFile string `json:"file_avg_duration,omitempty"`
}

type Opt ΒΆ

type Opt struct {
	// contains filtered or unexported fields
}

type Option ΒΆ

type Option func(config)

func WithCustomArrows ΒΆ added in v0.4.0

func WithCustomArrows(p []CustomArrow) Option

func WithCustomFields ΒΆ added in v0.5.10

func WithCustomFields(p []CustomField) Option

func WithDuckPathsChan ΒΆ added in v0.5.2

func WithDuckPathsChan(s int) Option

func WithFileRotateThresholdDurationSeconds ΒΆ added in v0.5.12

func WithFileRotateThresholdDurationSeconds(p int) Option

WithFileRotateThresholdDurationSeconds sets the database file rotation duration threshold in seconds. Minimum duration is 60 seconds

func WithFileRotateThresholdMB ΒΆ

func WithFileRotateThresholdMB(p int64) Option

WithFileRotateThresholdMB sets the database file rotation size. Minimum rotation threshold is 100MB.

func WithNormalizer ΒΆ added in v0.5.0

func WithNormalizer(fields, aliases []string, failOnRangeError bool) Option

WithNormalizer configures the scalars to add to a flat Arrow Record suitable for efficient aggregation. Protobuf data with nested messages converted to Arrow records is not only slower to insert into duckdb, running aggregation queries on nested data is much slower(by orders of magnitude). Fields should be specified by their path (field names separated by a period ie. 'field1.field2.field3'). The Arrow field types of the selected fields will be used to build the new schema. If coaslescing data between multiple fields of the same type, specify only one of the paths. List fields should have an index to retrieve specified, otherwise defaults to all elements; ranges are not yet implemented.

func WithoutDuck ΒΆ

func WithoutDuck() Option

func WithoutDuckIngestRaw ΒΆ added in v0.5.1

func WithoutDuckIngestRaw() Option

func WithoutKafka ΒΆ

func WithoutKafka() Option

func WithoutProcessing ΒΆ

func WithoutProcessing() Option

type Orchestrator ΒΆ

type Orchestrator[T proto.Message] struct {
	Metrics *Metrics
	// contains filtered or unexported fields
}

func NewOrchestrator ΒΆ

func NewOrchestrator[T proto.Message](opts ...Option) (*Orchestrator[T], error)

func (*Orchestrator[T]) ArrowQueueCapacity ΒΆ added in v0.5.0

func (o *Orchestrator[T]) ArrowQueueCapacity() int

func (*Orchestrator[T]) BenchmarksReport ΒΆ

func (o *Orchestrator[T]) BenchmarksReport() string

BenchmarksReport generates an indented live benchmark report with throughput rates sampled from the last 30 seconds.

func (*Orchestrator[T]) BenchmarksUnformatedReport ΒΆ added in v0.5.7

func (o *Orchestrator[T]) BenchmarksUnformatedReport() string

BenchmarksUnformatedReport generates an unindented live benchmark report with throughput rates sampled from the last 30 seconds.

func (*Orchestrator[T]) Close ΒΆ

func (o *Orchestrator[T]) Close()

Close closes the DuckDB database, if open.

func (*Orchestrator[T]) ConfigureDuck ΒΆ

func (o *Orchestrator[T]) ConfigureDuck(opts ...DuckOption) error

ConfigureDuck initializes DuckDB connection settings.

func (*Orchestrator[T]) ConfigureProcessor ΒΆ

func (o *Orchestrator[T]) ConfigureProcessor(rChanCap, rowGroupSizeMultiplier, routineCount int, unmarshalFunc func([]byte, any) error) error

func (*Orchestrator[T]) CurrentDBSize ΒΆ added in v0.4.0

func (o *Orchestrator[T]) CurrentDBSize() int64

func (*Orchestrator[T]) DuckConnCount ΒΆ

func (o *Orchestrator[T]) DuckConnCount() int

func (*Orchestrator[T]) DuckIngest ΒΆ

func (o *Orchestrator[T]) DuckIngest(ctx context.Context, w *sync.WaitGroup)

DuckIngest reads records from the record channel and ingests the Records into a single DuckDB database. Ingestion ends when the Record channel is closed and empty.

func (*Orchestrator[T]) DuckIngestWithRotate ΒΆ

func (o *Orchestrator[T]) DuckIngestWithRotate(ctx context.Context, w *sync.WaitGroup)

DuckIngestWithRotate reads records from the record channel and ingests the Records into rotating DuckDB files, rotating at the file size threshold defined. Ingestion ends when the Record channel is closed and empty.

func (*Orchestrator[T]) DuckPaths ΒΆ

func (o *Orchestrator[T]) DuckPaths() chan string

func (*Orchestrator[T]) Error ΒΆ

func (o *Orchestrator[T]) Error() error

func (*Orchestrator[T]) IsClosed ΒΆ added in v0.5.8

func (o *Orchestrator[T]) IsClosed() bool

IsClosed returns whether DuckDB database is open or not.

func (*Orchestrator[T]) KafkaClientCount ΒΆ

func (o *Orchestrator[T]) KafkaClientCount() int

func (*Orchestrator[T]) KafkaQueueCapacity ΒΆ added in v0.5.0

func (o *Orchestrator[T]) KafkaQueueCapacity() int

func (*Orchestrator[T]) MessageChan ΒΆ

func (o *Orchestrator[T]) MessageChan() chan []byte

func (*Orchestrator[T]) MessageChanClose ΒΆ added in v0.5.8

func (o *Orchestrator[T]) MessageChanClose()

func (*Orchestrator[T]) MessageChanSend ΒΆ added in v0.5.8

func (o *Orchestrator[T]) MessageChanSend(m []byte)

func (*Orchestrator[T]) MockKafka ΒΆ added in v0.3.0

func (o *Orchestrator[T]) MockKafka(ctx context.Context, w *sync.WaitGroup, p T)

MockKafka produces protobuf messages with random data in each field to the message channel. An initialized message with at least one field containing data must be passed as an argument for generation to work. Usage: wg.Add(1) go o.MockKafka(ctxT, &wg, &gen.RequestEvent{Id: "1233242423243"})

func (*Orchestrator[T]) MsgProcessorsCount ΒΆ

func (o *Orchestrator[T]) MsgProcessorsCount() int

func (*Orchestrator[T]) NewKafkaConfig ΒΆ

func (o *Orchestrator[T]) NewKafkaConfig() *KafkaClientConf[T]

func (*Orchestrator[T]) NewMetrics ΒΆ

func (o *Orchestrator[T]) NewMetrics()

func (*Orchestrator[T]) ProcessMessages ΒΆ

func (o *Orchestrator[T]) ProcessMessages(ctx context.Context, wg *sync.WaitGroup)

ProcessMessages creates a pool of deserializer goroutines

func (*Orchestrator[T]) RecordChan ΒΆ

func (o *Orchestrator[T]) RecordChan() chan Record

func (*Orchestrator[T]) RecordChanClose ΒΆ added in v0.5.8

func (o *Orchestrator[T]) RecordChanClose()

func (*Orchestrator[T]) RecordChanSend ΒΆ added in v0.5.8

func (o *Orchestrator[T]) RecordChanSend(r Record)

func (*Orchestrator[T]) Report ΒΆ

func (o *Orchestrator[T]) Report() string

Report generates a summary of the collected Metrics

func (*Orchestrator[T]) ReportJSONL ΒΆ

func (o *Orchestrator[T]) ReportJSONL() string

Report generates a summary of the collected Metrics

func (*Orchestrator[T]) ResetMetrics ΒΆ added in v0.5.0

func (o *Orchestrator[T]) ResetMetrics()

ResetMetrics resets all metrics to zero.

func (*Orchestrator[T]) RestartDuck ΒΆ added in v0.5.8

func (o *Orchestrator[T]) RestartDuck() error

Reopen DuckDB database using an existing DuckDB configuration. Returns an error if ConfigureDuck has not been previously run or if Orchestrator has not been closed.

func (*Orchestrator[T]) Run ΒΆ

func (o *Orchestrator[T]) Run(ctx context.Context, wg *sync.WaitGroup)

func (*Orchestrator[T]) Schema ΒΆ

func (o *Orchestrator[T]) Schema() *bufa.Schema[T]

func (*Orchestrator[T]) StartMetrics ΒΆ

func (o *Orchestrator[T]) StartMetrics()

func (*Orchestrator[T]) UpdateMetrics ΒΆ

func (o *Orchestrator[T]) UpdateMetrics()

UpdateMetrics calculates the total duration, throughput, and throughput in bytes.

type Record ΒΆ added in v0.5.0

type Record struct {
	Raw  arrow.Record
	Norm arrow.Record
}

type TypedMetricsReport ΒΆ

type TypedMetricsReport struct {
	NumCPU             int     `json:"num_cpu"`
	RuntimeOS          string  `json:"runtime_os"`
	KafkaClientCount   int     `json:"kafka_clients"`
	KafkaQueueCap      int     `json:"kafka_queue_cap"`
	ProcessorCount     int     `json:"processor_routines"`
	ArrowQueueCap      int     `json:"arrow_queue_cap"`
	DuckDBThresholdMB  int     `json:"duckdb_threshold_mb"`
	DuckConnCount      int     `json:"duckdb_connections,omitzero"`
	CustomArrows       *int    `json:"custom_arrows,omitempty"`
	NormalizerFields   *int    `json:"normalizer_fields,omitempty"`
	StartTime          string  `json:"start_time"`
	EndTime            string  `json:"end_time"`
	Records            int64   `json:"records"`
	NormRecords        int64   `json:"norm_records"`
	DataTransferred    int64   `json:"bytes_transferred"`
	Duration           int64   `json:"duration_nano"`
	RecordsPerSec      float64 `json:"records_per_second"`
	TotalRecordsPerSec float64 `json:"total_rows_per_second"`
	TransferRate       string  `json:"transfer_rate"`
	OutputFiles        int64   `json:"duckdb_files"`
	OutputFilesMB      int64   `json:"duckdb_files_MB"`
	AvgDurationPerFile float64 `json:"file_avg_duration,omitzero"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL