Versions in this module Expand all Collapse all v0 v0.1.0 Aug 6, 2024 Changes in this version + var File_queue_kafka_conf_proto protoreflect.FileDescriptor + func MustNewQueue(c *Conf, handler ConsumeHandler) (queue.MessageQueue, error) + func WrapError(err error) error + type Conf struct + Acks int32 + Brokers []string + Group string + Net *Net + Offset string + ReadTimeout *durationpb.Duration + Topic string + WriteTimeout *durationpb.Duration + func (*Conf) Descriptor() ([]byte, []int) + func (*Conf) ProtoMessage() + func (m *Conf) Validate() error + func (m *Conf) ValidateAll() error + func (x *Conf) GetAcks() int32 + func (x *Conf) GetBrokers() []string + func (x *Conf) GetGroup() string + func (x *Conf) GetNet() *Net + func (x *Conf) GetOffset() string + func (x *Conf) GetReadTimeout() *durationpb.Duration + func (x *Conf) GetTopic() string + func (x *Conf) GetWriteTimeout() *durationpb.Duration + func (x *Conf) ProtoReflect() protoreflect.Message + func (x *Conf) Reset() + func (x *Conf) String() string + type ConfMultiError []error + func (m ConfMultiError) AllErrors() []error + func (m ConfMultiError) Error() string + type ConfValidationError struct + func (e ConfValidationError) Cause() error + func (e ConfValidationError) Error() string + func (e ConfValidationError) ErrorName() string + func (e ConfValidationError) Field() string + func (e ConfValidationError) Key() bool + func (e ConfValidationError) Reason() string + type ConsumeHandle func(ctx context.Context, topic string, key, message []byte) error + type ConsumeHandler interface + Consume func(ctx context.Context, topic string, key, message []byte) error + type Consumer struct + func NewConsumer(c *Conf, handler ConsumeHandler) (*Consumer, error) + func (c *Consumer) Start(context.Context) error + func (c *Consumer) Stop(context.Context) error + type Consumers struct + func NewQueue(c *Conf, handler ConsumeHandler) (*Consumers, error) + func (q Consumers) Name() string + func (q Consumers) Start(ctx context.Context) error + func (q Consumers) Stop(ctx context.Context) error + type KafkaMessageTextMapCarrier struct + func (carrier *KafkaMessageTextMapCarrier) Get(key string) string + func (carrier *KafkaMessageTextMapCarrier) Keys() []string + func (carrier *KafkaMessageTextMapCarrier) Set(key string, value string) + type Net struct + Sasl *Net_SASL + Tls *Net_TLS + func (*Net) Descriptor() ([]byte, []int) + func (*Net) ProtoMessage() + func (m *Net) Validate() error + func (m *Net) ValidateAll() error + func (x *Net) GetSasl() *Net_SASL + func (x *Net) GetTls() *Net_TLS + func (x *Net) ProtoReflect() protoreflect.Message + func (x *Net) Reset() + func (x *Net) String() string + type NetMultiError []error + func (m NetMultiError) AllErrors() []error + func (m NetMultiError) Error() string + type NetValidationError struct + func (e NetValidationError) Cause() error + func (e NetValidationError) Error() string + func (e NetValidationError) ErrorName() string + func (e NetValidationError) Field() string + func (e NetValidationError) Key() bool + func (e NetValidationError) Reason() string + type Net_SASL struct + Enable bool + Password string + User string + func (*Net_SASL) Descriptor() ([]byte, []int) + func (*Net_SASL) ProtoMessage() + func (m *Net_SASL) Validate() error + func (m *Net_SASL) ValidateAll() error + func (x *Net_SASL) GetEnable() bool + func (x *Net_SASL) GetPassword() string + func (x *Net_SASL) GetUser() string + func (x *Net_SASL) ProtoReflect() protoreflect.Message + func (x *Net_SASL) Reset() + func (x *Net_SASL) String() string + type Net_SASLMultiError []error + func (m Net_SASLMultiError) AllErrors() []error + func (m Net_SASLMultiError) Error() string + type Net_SASLValidationError struct + func (e Net_SASLValidationError) Cause() error + func (e Net_SASLValidationError) Error() string + func (e Net_SASLValidationError) ErrorName() string + func (e Net_SASLValidationError) Field() string + func (e Net_SASLValidationError) Key() bool + func (e Net_SASLValidationError) Reason() string + type Net_TLS struct + Cert string + Enable bool + Jks string + Key string + Password string + RootCa string + func (*Net_TLS) Descriptor() ([]byte, []int) + func (*Net_TLS) ProtoMessage() + func (m *Net_TLS) Validate() error + func (m *Net_TLS) ValidateAll() error + func (x *Net_TLS) GetCert() string + func (x *Net_TLS) GetEnable() bool + func (x *Net_TLS) GetJks() string + func (x *Net_TLS) GetKey() string + func (x *Net_TLS) GetPassword() string + func (x *Net_TLS) GetRootCa() string + func (x *Net_TLS) ProtoReflect() protoreflect.Message + func (x *Net_TLS) Reset() + func (x *Net_TLS) String() string + type Net_TLSMultiError []error + func (m Net_TLSMultiError) AllErrors() []error + func (m Net_TLSMultiError) Error() string + type Net_TLSValidationError struct + func (e Net_TLSValidationError) Cause() error + func (e Net_TLSValidationError) Error() string + func (e Net_TLSValidationError) ErrorName() string + func (e Net_TLSValidationError) Field() string + func (e Net_TLSValidationError) Key() bool + func (e Net_TLSValidationError) Reason() string + type Producer struct + func NewProducer(c *Conf) (*Producer, error) + func (p *Producer) Close() error + func (p *Producer) Name() string + func (p *Producer) Push(ctx context.Context, topic string, key, value []byte) error + type Pusher interface + Close func() error + Name func() string + Push func(ctx context.Context, topic string, key, value []byte) error + func MustNewProducer(c *Conf) Pusher