ingest

package
v0.0.0-...-49cb14a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 25, 2024 License: Apache-2.0 Imports: 19 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func GenerateSQLAndApply

func GenerateSQLAndApply(schema map[string]interface{}, table string, isAlter bool) error

GenerateSQLAndApply generates the SQL query for either creating or altering the Clickhouse schema for a given table and makes the given changes to the database.

func GetDataByChannel

func GetDataByChannel(data []map[string]interface{}) map[string][]map[string]interface{}

GetDataByChannel transforms a slice of messages and return a map where they are grouped by <_channel>.

func GetStorableData

func GetStorableData(raw []map[string]interface{}) []map[string]interface{}

GetStorableData gets a list of transformed messages and return only a list of further transformed messages, with only the fields that will be stored on ClickHouse.

func SortMap

func SortMap(m map[string]interface{}) (map[string]interface{}, []string, []interface{}, error)

SortMap takes a map and returns a sorted version of it.

Types

type Batcher

type Batcher interface {
	Sink(data []map[string]interface{}, endC chan struct{}) (count int, err error)
}

type BatcherWorker

type BatcherWorker struct {
	sync.RWMutex
	Conn clickhouse.Conn
	// IterCount keeps track of the number of iterations of sinking of the
	// current worker
	IterCount int64
}

func (*BatcherWorker) Sink

func (b *BatcherWorker) Sink(data []map[string]interface{}) (count int, err error)

Sink will receive a slice of data ready to be added to ClickHouse and will proceed to the dumping of those data in an efficient manner, givent the shape of the data.

type Ingester

type Ingester interface {
	Start()
	Stop() error
	Consume()
}

type IngesterWorker

type IngesterWorker struct {
	sync.RWMutex
	*BatcherWorker
	*kafkaservice.KafkaWorker
	MongoDatabase *mongo.Database
	IsRunning     bool
	IsIngesting   bool
	Messages      *Messages
	// ConsumeInterval is the periodic interval to consume messages
	// from kafka.
	ConsumeInterval time.Duration
	// MinBatchableSize is the minimum number of messages that should
	// be stored to allow committing or batching.
	MinBatchableSize int
	// MaxBatchableSize is the maximum number of messages that we can
	// buffer to Messages before committing or batching.
	MaxBatchableSize int
	// MaxBatchableWait is the threshold for committing and batching
	// given that MinCommitCount is met.
	MaxBatchableWait time.Duration
}

IngesterWorker is responsible the handle the end-to-end dumping of data to ClickHouse

func NewClickHouseIngester

func NewClickHouseIngester(cfg *config.Config) *IngesterWorker

TODO: make configs

func (*IngesterWorker) Consume

func (i *IngesterWorker) Consume()

Consume reads up to i.MaxBatchableSize messages from Kafka and orchestrate the further processing of these by invoking the subsequent methods.

func (*IngesterWorker) ExtractSchemas

func (i *IngesterWorker) ExtractSchemas() error

ExtractSchemas takes a bunch of data and extracts the SQL compatible schema out of them.

func (*IngesterWorker) Start

func (i *IngesterWorker) Start(stop chan os.Signal)

Start spins up the consuming process generally speaking. It runs as long as i.IsRunning is true and tracks new incomming logs

func (*IngesterWorker) Stop

func (i *IngesterWorker) Stop() error

Stop will stop all the ongoing processes gracefully, including gracefully shutting down the consumming process from Kafka and the sinking process to ClickHouse.

func (*IngesterWorker) Transform

func (i *IngesterWorker) Transform() error

Transform will flatten the message to the appropriate format that will be stored to ClickHouse, add metadata.

type Messages

type Messages struct {
	sync.RWMutex
	Data            []*core.Log
	TransformedData []map[string]interface{}
	StorableData    []map[string]interface{}
}

Messages is the data structure holding the messages that will be written to ClickHouse.

type MongoDBIngester

type MongoDBIngester struct {
	sync.RWMutex
	*mongo.Database
	*kafkaservice.KafkaWorker
	ConsumeInterval time.Duration
	// TopicCallback is the Kafka topic to produce to upon successful insertion.
	TopicCallback string
	CloseChan     chan struct{}
}

func NewMongoDBIngester

func NewMongoDBIngester(cfg *config.Config) *MongoDBIngester

func (*MongoDBIngester) Consume

func (m *MongoDBIngester) Consume() error

func (*MongoDBIngester) Sink

func (m *MongoDBIngester) Sink(msg *kafka.Message) error

func (*MongoDBIngester) Start

func (m *MongoDBIngester) Start(stop chan os.Signal)

Start spins up everything and starts listening for incoming events from Kafka, and gets ready to sink them to the database.

func (*MongoDBIngester) Stop

func (m *MongoDBIngester) Stop() error

type MongoDBIngesterConfig

type MongoDBIngesterConfig struct {
	// Kafka-related configurations
	KafkaConfigs    kafkaservice.KafkaConfigs
	KafkaTopics     []string
	ConsumeInterval time.Duration
	// MongoDB-related configurations
	MongoServer   string
	TopicCallback string
	Database      string
}

type Values

type Values string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL