Documentation ¶
Index ¶
- Constants
- Variables
- type Inputer
- type KafkaGo
- func (k *KafkaGo) CommitMessages(ctx context.Context, msg *model.InputMessage) (err error)
- func (k *KafkaGo) Description() string
- func (k *KafkaGo) Init(cfg *config.Config, taskName string, putFn func(msg model.InputMessage)) (err error)
- func (k *KafkaGo) Run(ctx context.Context)
- func (k *KafkaGo) Stop() error
- type KafkaSarama
- func (k *KafkaSarama) CommitMessages(ctx context.Context, msg *model.InputMessage) error
- func (k *KafkaSarama) Description() string
- func (k *KafkaSarama) Init(cfg *config.Config, taskName string, putFn func(msg model.InputMessage)) (err error)
- func (k *KafkaSarama) Run(ctx context.Context)
- func (k *KafkaSarama) Stop() error
- type MyConsumerGroupHandler
- type XDGSCRAMClient
Constants ¶
View Source
const ( TypeKafkaGo = "kafka-go" TypeKafkaSarama = "sarama" TypePulsar = "pulsar" )
Variables ¶
View Source
var SHA256 scram.HashGeneratorFcn = func() hash.Hash { return sha256.New() }
View Source
var SHA512 scram.HashGeneratorFcn = func() hash.Hash { return sha512.New() }
Functions ¶
This section is empty.
Types ¶
type Inputer ¶ added in v1.5.2
type Inputer interface { Init(cfg *config.Config, taskName string, putFn func(msg model.InputMessage)) error Run(ctx context.Context) Stop() error CommitMessages(ctx context.Context, message *model.InputMessage) error }
func NewInputer ¶ added in v1.5.2
type KafkaGo ¶ added in v1.5.2
type KafkaGo struct {
// contains filtered or unexported fields
}
KafkaGo implements input.Inputer
func NewKafkaGo ¶ added in v1.5.2
func NewKafkaGo() *KafkaGo
NewKafkaGo get instance of kafka reader
func (*KafkaGo) CommitMessages ¶ added in v1.5.2
func (*KafkaGo) Description ¶ added in v1.5.2
Description of this kafka consumer, which topic it reads from
type KafkaSarama ¶ added in v1.5.2
type KafkaSarama struct {
// contains filtered or unexported fields
}
KafkaSarama implements input.Inputer
func NewKafkaSarama ¶ added in v1.5.2
func NewKafkaSarama() *KafkaSarama
NewKafkaSarama get instance of kafka reader
func (*KafkaSarama) CommitMessages ¶ added in v1.5.2
func (k *KafkaSarama) CommitMessages(ctx context.Context, msg *model.InputMessage) error
func (*KafkaSarama) Description ¶ added in v1.5.2
func (k *KafkaSarama) Description() string
Description of this kafka consumer, which topic it reads from
func (*KafkaSarama) Init ¶ added in v1.5.2
func (k *KafkaSarama) Init(cfg *config.Config, taskName string, putFn func(msg model.InputMessage)) (err error)
Init Initialise the kafka instance with configuration
func (*KafkaSarama) Run ¶ added in v1.5.2
func (k *KafkaSarama) Run(ctx context.Context)
kafka main loop
func (*KafkaSarama) Stop ¶ added in v1.5.2
func (k *KafkaSarama) Stop() error
Stop kafka consumer and close all connections
type MyConsumerGroupHandler ¶ added in v1.5.2
type MyConsumerGroupHandler struct {
// contains filtered or unexported fields
}
func (MyConsumerGroupHandler) Cleanup ¶ added in v1.5.2
func (h MyConsumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error
func (MyConsumerGroupHandler) ConsumeClaim ¶ added in v1.5.2
func (h MyConsumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
func (MyConsumerGroupHandler) Setup ¶ added in v1.5.2
func (h MyConsumerGroupHandler) Setup(sess sarama.ConsumerGroupSession) error
type XDGSCRAMClient ¶ added in v1.5.2
type XDGSCRAMClient struct { *scram.Client *scram.ClientConversation scram.HashGeneratorFcn }
func (*XDGSCRAMClient) Begin ¶ added in v1.5.2
func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error)
func (*XDGSCRAMClient) Done ¶ added in v1.5.2
func (x *XDGSCRAMClient) Done() bool
Click to show internal directories.
Click to hide internal directories.