Documentation ¶
Index ¶
- Constants
- Variables
- func ChangeSchema(newKeys *sync.Map, dbname, table string, dims []*model2.ColumnWithType) ([]*model2.ColumnWithType, error)
- func ClearDimsCacheByKey(key string)
- func ClearDimsCacheByRedis(key string)
- func ClearDimsCacheByTime(clearTime time.Duration)
- func ClearDimsCacheByTimeBylocal(clearTime time.Duration)
- func GetClusterSql() string
- func GetDims(database, table string, excludedColumns []string, conn *sqlx.DB, ...) (dims []*model2.ColumnWithType, err error)
- func GetDimsCachekey(database, table string) string
- func GetMergeTree(tableName string) string
- func GetReplacingMergeTree(tableName, ext string) string
- func GetSaramaConfig(kfkCfg model.KafkaCfg) (sarCfg *sarama.Config, err error)
- func GetSourceName(name string) (sourcename string)
- type KafkaSarama
- func (k *KafkaSarama) Clone() *KafkaSarama
- func (k *KafkaSarama) CommitMessages(msg *model.InputMessage) error
- func (k *KafkaSarama) Description() string
- func (k *KafkaSarama) Init(cfg model.KafkaCfg, topicName, consumerGroup string, ...) (err error)
- func (k *KafkaSarama) Run()
- func (k *KafkaSarama) Stop() error
- type MyConsumerGroupHandler
Constants ¶
View Source
const DimsHash = "dimsHash_"
Variables ¶
View Source
var (
ErrTblNotExist = errors.Errorf("table doesn't exist")
)
Functions ¶
func ChangeSchema ¶
func ChangeSchema(newKeys *sync.Map, dbname, table string, dims []*model2.ColumnWithType) ([]*model2.ColumnWithType, error)
func ClearDimsCacheByKey ¶
func ClearDimsCacheByKey(key string)
func ClearDimsCacheByRedis ¶
func ClearDimsCacheByRedis(key string)
func ClearDimsCacheByTime ¶
func GetClusterSql ¶
func GetClusterSql() string
func GetDimsCachekey ¶
func GetMergeTree ¶
func GetReplacingMergeTree ¶
func GetSaramaConfig ¶
func GetSourceName ¶
Types ¶
type KafkaSarama ¶
type KafkaSarama struct {
// contains filtered or unexported fields
}
func NewKafkaSarama ¶
func NewKafkaSarama() *KafkaSarama
func (*KafkaSarama) Clone ¶
func (k *KafkaSarama) Clone() *KafkaSarama
func (*KafkaSarama) CommitMessages ¶
func (k *KafkaSarama) CommitMessages(msg *model.InputMessage) error
func (*KafkaSarama) Description ¶
func (k *KafkaSarama) Description() string
func (*KafkaSarama) Init ¶
func (k *KafkaSarama) Init(cfg model.KafkaCfg, topicName, consumerGroup string, putFn func(msg model.InputMessage, markFn func()), cleanupFn func()) (err error)
func (*KafkaSarama) Run ¶
func (k *KafkaSarama) Run()
func (*KafkaSarama) Stop ¶
func (k *KafkaSarama) Stop() error
type MyConsumerGroupHandler ¶
type MyConsumerGroupHandler struct {
// contains filtered or unexported fields
}
func (MyConsumerGroupHandler) Cleanup ¶
func (h MyConsumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error
func (MyConsumerGroupHandler) ConsumeClaim ¶
func (h MyConsumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
func (MyConsumerGroupHandler) Setup ¶
func (h MyConsumerGroupHandler) Setup(sess sarama.ConsumerGroupSession) error
Click to show internal directories.
Click to hide internal directories.