Versions in this module Expand all Collapse all v0 v0.0.1 Dec 7, 2021 Changes in this version + const BalanceStrategyRange + const BalanceStrategyRoundRobin + const BalanceStrategySticky + const EventTypeConsumeMsgError + const EventTypeConsumeMsgSuccess + const EventTypeConsumerGroupError + const EventTypeInitialError + const EventTypeProduceMsgError + const EventTypeProduceMsgSuccess + const EventTypeTranceMessageError + const PartitionStrategyHash + const PartitionStrategyManual + const PartitionStrategyRandom + const PartitionStrategyRoundRobin + func Consume(cfg *Config, tranceConfig TranceConsumerConfigFunc, doCommit bool, ...) (<-chan string, error) + func DefaultConfig() *sarama.Config + func ParseBalanceStrategy(strategy BalanceStrategy) sarama.BalanceStrategy + func ParseKafkaVersion(version string) sarama.KafkaVersion + func ParsePartitionStrategy(strategy PartitionStrategy) sarama.PartitionerConstructor + func Produce(cfg *Config, tranceConfig TranceProducerConfigFunc, hook EventHook) (chan<- string, error) + type BalanceStrategy string + type Config struct + Name MessageType + Params map[string]string + func (c *Config) CheckParams(defaults map[string]string, requires []string) error + func (c *Config) Get(key, dftValue string) string + func (c *Config) SetKey(key, value string) + type ConfigSlice []*Config + func (s ConfigSlice) FindConfig(name MessageType) *Config + type EventHook func(EventType, interface{}, error) + type EventType int + type Message map[string]interface + type MessageType string + type MissingConfigError struct + Name string + func (m MissingConfigError) Error() string + type PartitionStrategy string + type TranceConsumerConfigFunc func(configs *Config) (string, string, []string, *sarama.Config, error) + type TranceProducerConfigFunc func(configs *Config) (string, []string, *sarama.Config, error)