Documentation ¶
Index ¶
- func GenerateSQLAndApply(schema map[string]interface{}, table string, isAlter bool) error
- func GetDataByChannel(data []map[string]interface{}) map[string][]map[string]interface{}
- func GetStorableData(raw []map[string]interface{}) []map[string]interface{}
- func SortMap(m map[string]interface{}) (map[string]interface{}, []string, []interface{}, error)
- type Batcher
- type BatcherWorker
- type Ingester
- type IngesterWorker
- type Messages
- type MongoDBIngester
- type MongoDBIngesterConfig
- type Values
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func GenerateSQLAndApply ¶
GenerateSQLAndApply generates the SQL query for either creating or altering the Clickhouse schema for a given table and makes the given changes to the database.
func GetDataByChannel ¶
GetDataByChannel transforms a slice of messages and return a map where they are grouped by <_channel>.
func GetStorableData ¶
GetStorableData gets a list of transformed messages and return only a list of further transformed messages, with only the fields that will be stored on ClickHouse.
Types ¶
type BatcherWorker ¶
type IngesterWorker ¶
type IngesterWorker struct { sync.RWMutex *BatcherWorker *kafkaservice.KafkaWorker MongoDatabase *mongo.Database IsRunning bool IsIngesting bool Messages *Messages // ConsumeInterval is the periodic interval to consume messages // from kafka. ConsumeInterval time.Duration // MinBatchableSize is the minimum number of messages that should // be stored to allow committing or batching. MinBatchableSize int // MaxBatchableSize is the maximum number of messages that we can // buffer to Messages before committing or batching. MaxBatchableSize int // MaxBatchableWait is the threshold for committing and batching // given that MinCommitCount is met. MaxBatchableWait time.Duration }
IngesterWorker is responsible the handle the end-to-end dumping of data to ClickHouse
func NewClickHouseIngester ¶
func NewClickHouseIngester(cfg *config.Config) *IngesterWorker
TODO: make configs
func (*IngesterWorker) Consume ¶
func (i *IngesterWorker) Consume()
Consume reads up to i.MaxBatchableSize messages from Kafka and orchestrate the further processing of these by invoking the subsequent methods.
func (*IngesterWorker) ExtractSchemas ¶
func (i *IngesterWorker) ExtractSchemas() error
ExtractSchemas takes a bunch of data and extracts the SQL compatible schema out of them.
func (*IngesterWorker) Start ¶
func (i *IngesterWorker) Start(stop chan os.Signal)
Start spins up the consuming process generally speaking. It runs as long as i.IsRunning is true and tracks new incomming logs
func (*IngesterWorker) Stop ¶
func (i *IngesterWorker) Stop() error
Stop will stop all the ongoing processes gracefully, including gracefully shutting down the consumming process from Kafka and the sinking process to ClickHouse.
func (*IngesterWorker) Transform ¶
func (i *IngesterWorker) Transform() error
Transform will flatten the message to the appropriate format that will be stored to ClickHouse, add metadata.
type Messages ¶
type Messages struct { sync.RWMutex Data []*core.Log TransformedData []map[string]interface{} StorableData []map[string]interface{} }
Messages is the data structure holding the messages that will be written to ClickHouse.
type MongoDBIngester ¶
type MongoDBIngester struct { sync.RWMutex *mongo.Database *kafkaservice.KafkaWorker ConsumeInterval time.Duration // TopicCallback is the Kafka topic to produce to upon successful insertion. TopicCallback string CloseChan chan struct{} }
func NewMongoDBIngester ¶
func NewMongoDBIngester(cfg *config.Config) *MongoDBIngester
func (*MongoDBIngester) Consume ¶
func (m *MongoDBIngester) Consume() error
func (*MongoDBIngester) Start ¶
func (m *MongoDBIngester) Start(stop chan os.Signal)
Start spins up everything and starts listening for incoming events from Kafka, and gets ready to sink them to the database.
func (*MongoDBIngester) Stop ¶
func (m *MongoDBIngester) Stop() error
type MongoDBIngesterConfig ¶
type MongoDBIngesterConfig struct { // Kafka-related configurations KafkaConfigs kafkaservice.KafkaConfigs KafkaTopics []string ConsumeInterval time.Duration // MongoDB-related configurations MongoServer string TopicCallback string Database string }