kafka

package
v0.3.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 8, 2023 License: GPL-3.0 Imports: 16 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var File_kafkapacket_proto protoreflect.FileDescriptor

Functions

func ConvertKafkaPacketToMQConsumerMessage added in v0.2.5

func ConvertKafkaPacketToMQConsumerMessage(packet *KafkaPacket) mqenv.MQConsumerMessage

ConvertKafkaPacketToMQConsumerMessage 把接收到的kafkaPacket 数据转换成MQConsumerMessage.

func StopKafka added in v0.2.6

func StopKafka(mqConnName string) error

停止kafka

Types

type Base added in v0.2.5

type Base struct {
	Partition          int                                   // partition 分区
	Config             map[string]interface{}                // kafka 的配置字典
	CompletionCallback func(messages []k.Message, err error) // 发送状态通知函数
}

Base .

func (*Base) ConfigHeartbeatInterval added in v0.2.5

func (b *Base) ConfigHeartbeatInterval(interval int)

ConfigHeartbeatInterval 配置心跳检测间隔.

func (*Base) ConfigKerberosKeyTab added in v0.2.5

func (b *Base) ConfigKerberosKeyTab(kerberosKeyTab string)

ConfigKerberosKeyTab 使用kerberos 认证需要配置.

func (*Base) ConfigKerberosPrincipal added in v0.2.5

func (b *Base) ConfigKerberosPrincipal(kerberosPrincipal string)

ConfigKerberosPrincipal 使用kerberos 认证需要配置.

func (*Base) ConfigKerberosServiceName added in v0.2.5

func (b *Base) ConfigKerberosServiceName(name string)

ConfigKerberosServiceName 使用kerberos 认证需要配置.

func (*Base) ConfigPartition added in v0.2.5

func (b *Base) ConfigPartition(partition int)

ConfigPartition 配置分区,如partition 为0.

func (*Base) ConfigReconnectInterval added in v0.2.5

func (b *Base) ConfigReconnectInterval(interval int)

ConfigReconnectInterval 配置断线重连的时间间隔,单位是毫秒.

func (*Base) ConfigSaslMechanisms added in v0.2.5

func (b *Base) ConfigSaslMechanisms(saslMechanisms string)

ConfigSaslMechanisms 使用plain 认证需要配置,可以使用PLAIN.

func (*Base) ConfigSaslPassword added in v0.2.5

func (b *Base) ConfigSaslPassword(saslPassword string)

ConfigSaslPassword 使用plain 认证需要配置.

func (*Base) ConfigSaslUserName added in v0.2.5

func (b *Base) ConfigSaslUserName(saslUsername string)

ConfigSaslUserName 使用plain 认证需要配置.

func (*Base) ConfigSecurityProtocol added in v0.2.5

func (b *Base) ConfigSecurityProtocol(securityProtocol string)

ConfigSecurityProtocol 使用plain 和kerberos 认证需要配置,如sasl_plaintext.

func (*Base) ConfigServers added in v0.2.5

func (b *Base) ConfigServers(servers string)

ConfigServers 配置连接的服务器,如"localhost:9092,localhost:9093".

func (*Base) ConfigSessionTimeout added in v0.2.5

func (b *Base) ConfigSessionTimeout(timeout int)

ConfigSessionTimeout 配置会话超时.

func (*Base) SetCompletionCallback added in v0.2.6

func (b *Base) SetCompletionCallback(callback func(messages []k.Message, err error))

SetCompletionCallback 消息发送状态通知回调

type CallBack added in v0.2.5

type CallBack func([]byte)

CallBack .回调函数

type Config

type Config struct {
	Hosts             string
	Partition         int
	PrivateTopic      string
	GroupID           string
	MaxPollIntervalMS int
	// 消息类型:
	//direct:组播,订阅同一个topic,消费者组会相同,一条消息只会被组内一个消费者接收
	//fanout:广播,订阅同一个topic,但是消费者组会使用uuid,所有组都会收到信息
	MessageType string `yaml:"messageType" json:"messageType"`
	// kerberos 认证需要配置
	KerberosServiceName string
	KerberosKeytab      string
	KerberosPrincipal   string
	// plain 认证需要配置
	SaslMechanisms     string
	SaslUsername       string
	SaslPassword       string
	UseOriginalContent bool `yaml:"useOriginalContent" json:"useOriginalContent"`
}

Config kafkav2 配置参数.

type Consumer added in v0.2.5

type Consumer struct {
	Base
	Readers map[string]*k.Reader // 每一个topic 一个reader

	Brokers    []string         // kafka 的节点
	OffsetDict map[string]int64 // 记录偏移量,避免在连接断开重连时候重复处理信息
	// contains filtered or unexported fields
}

Consumer 消费者.

func NewConsumer added in v0.2.5

func NewConsumer(hosts string, groupID string) *Consumer

NewConsumer 实例化返回消费者.

func (*Consumer) ConfigGroupID added in v0.2.5

func (c *Consumer) ConfigGroupID(groupID string)

ConfigGroupID 配置group id.

func (*Consumer) ConfigMaxPollIntervalMS added in v0.2.5

func (c *Consumer) ConfigMaxPollIntervalMS(interval int)

ConfigMaxPollIntervalMS 配置两次拉取数据之间的最大间隔.

func (*Consumer) Receive added in v0.2.5

func (c *Consumer) Receive(topic string, callback CallBack) error

Receive 订阅topic,处理消息. @title Receive @param topic 订阅的topic @param callback ,处理接收到的信息,入参是 接收到的[]byte

func (*Consumer) StopConsumer added in v0.2.5

func (c *Consumer) StopConsumer()

StopConsumer 停止消费.

type InstStats

type InstStats struct {
	Bytes         int64  `json:"bytes"`
	Dials         int64  `json:"connections"`
	Topic         string `json:"topic"`
	Messages      int64  `json:"messages"`
	Rebalances    int64  `json:"rebalances"`
	Errors        int64  `json:"errors"`
	Timeouts      int64  `json:"timeouts"`
	ClientID      string `json:"clientID"`
	QueueLength   int64  `json:"queueLength"`
	QueueCapacity int64  `json:"queueCapacity"`
}

InstStats .

type KafkaPacket added in v0.2.5

type KafkaPacket struct {

	// Properties
	ContentType     string                `protobuf:"bytes,1,opt,name=contentType,proto3" json:"contentType,omitempty"`         // MIME content type
	ContentEncoding string                `protobuf:"bytes,2,opt,name=contentEncoding,proto3" json:"contentEncoding,omitempty"` // MIME content encoding
	SendTo          string                `protobuf:"bytes,3,opt,name=sendTo,proto3" json:"sendTo,omitempty"`                   // application use - address to send to (ex: RPC)
	GroupId         string                `protobuf:"bytes,4,opt,name=groupId,proto3" json:"groupId,omitempty"`                 // application use - kafka group id
	CorrelationId   string                `protobuf:"bytes,5,opt,name=correlationId,proto3" json:"correlationId,omitempty"`     // application use - correlation identifier
	ReplyTo         string                `protobuf:"bytes,6,opt,name=replyTo,proto3" json:"replyTo,omitempty"`                 // application use - address to reply to (ex: RPC)
	MessageId       string                `protobuf:"bytes,7,opt,name=messageId,proto3" json:"messageId,omitempty"`             // application use - message identifier
	Timestamp       uint64                `protobuf:"varint,8,opt,name=timestamp,proto3" json:"timestamp,omitempty"`            // application use - message timestamp
	Type            string                `protobuf:"bytes,9,opt,name=type,proto3" json:"type,omitempty"`                       // application use - message type name
	UserId          string                `protobuf:"bytes,10,opt,name=userId,proto3" json:"userId,omitempty"`                  // application use - creating user - should be authenticated user
	AppId           string                `protobuf:"bytes,11,opt,name=appId,proto3" json:"appId,omitempty"`                    // application use - creating application id
	StatusCode      uint32                `protobuf:"varint,12,opt,name=statusCode,proto3" json:"statusCode,omitempty"`         // application response use - message response status
	ErrorMessage    string                `protobuf:"bytes,13,opt,name=errorMessage,proto3" json:"errorMessage,omitempty"`      // application response use - error message
	Headers         []*KafkaPacket_Header `protobuf:"bytes,14,rep,name=headers,proto3" json:"headers,omitempty"`                // Application or header exchange table
	Body            []byte                `protobuf:"bytes,15,opt,name=body,proto3" json:"body,omitempty"`
	RoutingKey      string                `protobuf:"bytes,16,opt,name=routingKey,proto3" json:"routingKey,omitempty"` // application use - delivery request
	ConsumerTag     string                `protobuf:"bytes,17,opt,name=consumerTag,proto3" json:"consumerTag,omitempty"`
	Exchange        string                `protobuf:"bytes,18,opt,name=exchange,proto3" json:"exchange,omitempty"`
	// contains filtered or unexported fields
}

func (*KafkaPacket) Descriptor deprecated added in v0.2.5

func (*KafkaPacket) Descriptor() ([]byte, []int)

Deprecated: Use KafkaPacket.ProtoReflect.Descriptor instead.

func (*KafkaPacket) GetAppId added in v0.2.5

func (x *KafkaPacket) GetAppId() string

func (*KafkaPacket) GetBody added in v0.2.5

func (x *KafkaPacket) GetBody() []byte

func (*KafkaPacket) GetConsumerTag added in v0.2.6

func (x *KafkaPacket) GetConsumerTag() string

func (*KafkaPacket) GetContentEncoding added in v0.2.5

func (x *KafkaPacket) GetContentEncoding() string

func (*KafkaPacket) GetContentType added in v0.2.5

func (x *KafkaPacket) GetContentType() string

func (*KafkaPacket) GetCorrelationId added in v0.2.5

func (x *KafkaPacket) GetCorrelationId() string

func (*KafkaPacket) GetErrorMessage added in v0.2.5

func (x *KafkaPacket) GetErrorMessage() string

func (*KafkaPacket) GetExchange added in v0.2.6

func (x *KafkaPacket) GetExchange() string

func (*KafkaPacket) GetGroupId added in v0.2.5

func (x *KafkaPacket) GetGroupId() string

func (*KafkaPacket) GetHeaders added in v0.2.5

func (x *KafkaPacket) GetHeaders() []*KafkaPacket_Header

func (*KafkaPacket) GetMessageId added in v0.2.5

func (x *KafkaPacket) GetMessageId() string

func (*KafkaPacket) GetReplyTo added in v0.2.5

func (x *KafkaPacket) GetReplyTo() string

func (*KafkaPacket) GetRoutingKey added in v0.2.6

func (x *KafkaPacket) GetRoutingKey() string

func (*KafkaPacket) GetSendTo added in v0.2.5

func (x *KafkaPacket) GetSendTo() string

func (*KafkaPacket) GetStatusCode added in v0.2.5

func (x *KafkaPacket) GetStatusCode() uint32

func (*KafkaPacket) GetTimestamp added in v0.2.5

func (x *KafkaPacket) GetTimestamp() uint64

func (*KafkaPacket) GetType added in v0.2.5

func (x *KafkaPacket) GetType() string

func (*KafkaPacket) GetUserId added in v0.2.5

func (x *KafkaPacket) GetUserId() string

func (*KafkaPacket) ProtoMessage added in v0.2.5

func (*KafkaPacket) ProtoMessage()

func (*KafkaPacket) ProtoReflect added in v0.2.5

func (x *KafkaPacket) ProtoReflect() protoreflect.Message

func (*KafkaPacket) Reset added in v0.2.5

func (x *KafkaPacket) Reset()

func (*KafkaPacket) String added in v0.2.5

func (x *KafkaPacket) String() string

type KafkaPacket_Header added in v0.2.5

type KafkaPacket_Header struct {
	Name  string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
	Value string `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"`
	// contains filtered or unexported fields
}

func (*KafkaPacket_Header) Descriptor deprecated added in v0.2.5

func (*KafkaPacket_Header) Descriptor() ([]byte, []int)

Deprecated: Use KafkaPacket_Header.ProtoReflect.Descriptor instead.

func (*KafkaPacket_Header) GetName added in v0.2.5

func (x *KafkaPacket_Header) GetName() string

func (*KafkaPacket_Header) GetValue added in v0.2.5

func (x *KafkaPacket_Header) GetValue() string

func (*KafkaPacket_Header) ProtoMessage added in v0.2.5

func (*KafkaPacket_Header) ProtoMessage()

func (*KafkaPacket_Header) ProtoReflect added in v0.2.5

func (x *KafkaPacket_Header) ProtoReflect() protoreflect.Message

func (*KafkaPacket_Header) Reset added in v0.2.5

func (x *KafkaPacket_Header) Reset()

func (*KafkaPacket_Header) String added in v0.2.5

func (x *KafkaPacket_Header) String() string

type KafkaWorker added in v0.2.5

type KafkaWorker struct {
	Producer *Producer // 生产者
	Consumer *Consumer // 消费者

	PrivateTopic string // 私有topic,用于发出信息后收到回复

	ContentType        string //序列化类型,如json
	ContentEncoding    string // 编码格式
	GroupID            string //组id,会包含在 kafkapacket 数据包中
	MsgType            string // 消息类型
	Stats              Stats  // 统计信息
	UseOriginalContent bool   // 是否使用原始的方式序列化(使用json 序列化,而不是protobuf)
	// contains filtered or unexported fields
}

KafkaWorker 把生产者、消费者结合起来,实现请求响应模式.

func GetKafka

func GetKafka(mqConnName string) (*KafkaWorker, error)

GetKafka 获取kafka.

func InitKafka

func InitKafka(mqConnName string, config Config) (*KafkaWorker, error)

InitKafka 初始化kafka.

func NewKafkaWorker added in v0.2.5

func NewKafkaWorker(hosts string, partition int, privateTopic, groupID string) *KafkaWorker

NewKafkaWorker 实例化一个kafka worker.

func (*KafkaWorker) Send added in v0.2.5

func (worker *KafkaWorker) Send(topic string, publishMsg *mqenv.MQPublishMessage, needReply bool) (*mqenv.MQConsumerMessage, error)

Send 发送信息.

func (*KafkaWorker) Subscribe added in v0.2.5

func (worker *KafkaWorker) Subscribe(topic string, consumeProxy *mqenv.MQConsumerProxy) error

Subscribe 订阅topic.

type Producer added in v0.2.5

type Producer struct {
	Base
	Brokers []string // kafka 的节点
	Writer  map[string]*k.Writer
}

Producer 生产者.

func NewProducer added in v0.2.5

func NewProducer(hosts string, partition int) *Producer

NewProducer 返回一个生产者.

func (*Producer) Send added in v0.2.5

func (p *Producer) Send(topic string, value []byte) error

Send 发送一条消息.

type Stats

type Stats struct {
	Consumer InstStats `json:"consumer"`
	Producer InstStats `json:"producer"`
}

Stats struct

type Worker added in v0.2.5

type Worker func(*KafkaPacket) []byte

Worker 订阅topic 后处理收到信息的回调函数.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL