Documentation
¶
Index ¶
- Variables
- func CommitOffset(Zk *zklib.ZK, node string, offset int64) (err error)
- func IsTopicAvailMonFeatureEnabled() bool
- type AppEventT
- type ConnStatus
- type Consumer
- func (cons *Consumer) Close() error
- func (cons *Consumer) DecryptMsg(msg []byte) ([]byte, error)
- func (cons *Consumer) GetOffset() (offset int64, err error)
- func (cons *Consumer) GetPartitionNum() int32
- func (cons *Consumer) OffsetCommitRoutine()
- func (cons *Consumer) ProcessReceivedError(ok bool, msg *sarama.ConsumerError) bool
- func (cons *Consumer) ProcessReceivedMessage(ok bool, msg *sarama.ConsumerMessage) bool
- func (cons *Consumer) Receive() chan *ConsumerEvent
- func (cons *Consumer) ReceiveRoutine()
- func (cons *Consumer) SetAckOffset(offset int64) (err error)
- func (cons *Consumer) UpdateOffsetStats(interval int)
- type ConsumerConfig
- type ConsumerEvent
- type MessageQueue
- type MsgQ
- func (msgQ *MsgQ) NewConsumer(consConf *ConsumerConfig) (MsgQConsumer, error)
- func (msgQ *MsgQ) NewConsumerWithRetry(consConf *ConsumerConfig) (MsgQConsumer, error)
- func (msgQ *MsgQ) NewProducer(prodConf *ProducerConfig) (MsgQProducer, error)
- func (msgQ *MsgQ) NewProducerWithRetry(prodConf *ProducerConfig) (MsgQProducer, error)
- func (msgQ *MsgQ) Partitions(topicName string) (int, error)
- func (msgQ *MsgQ) ReceiveTopicAvailMonEvt() chan *TopicAvailEvent
- func (msgQ *MsgQ) Shutdown() error
- type MsgQConsumer
- type MsgQProducer
- type PartitionStatus
- type PartitionerType
- type Producer
- func (prod *Producer) CheckPartitionBlackListed(partition int32) bool
- func (prod *Producer) Close() error
- func (prod *Producer) EncryptMsg(msg []byte) ([]byte, error)
- func (prod *Producer) GetAffectedPartList(partition int32) []int32
- func (prod *Producer) GetPartitionStatus(partition int32) PartitionStatus
- func (prod *Producer) GetTopicStatus() PartitionStatus
- func (prod *Producer) ReceiveErrors() chan *ProducerEvent
- func (prod *Producer) Send(msg []byte) error
- func (prod *Producer) SendKeyMsg(key, msg []byte) error
- func (prod *Producer) SendMsg(msg *sarama.ProducerMessage) error
- func (prod *Producer) SendToPartition(Partition int32, msg []byte) error
- func (prod *Producer) SetPartitionStatus(partition int32, newStatus PartitionStatus) []PartitionStatus
- func (prod *Producer) WatchForErrors()
- func (prod *Producer) WatchForSuccess()
- func (prod *Producer) ZkPathMonitorRoutine()
- type ProducerConfig
- type ProducerEvent
- type TopicAvailEvent
Constants ¶
This section is empty.
Variables ¶
var ( ErrAllPartitionUsed = errors.New("msgq: All partitions used by all the msgq instances ") ErrNoConfigSpecfied = errors.New("msgq: No message library configuration is provided") ErrNoMetadata = errors.New("No Metadata") ErrInvalidPartitioner = errors.New("Invalid partition handler") ErrInvalidPartition = errors.New("Invalid value for a partition number") ErrExcludeListPartition = errors.New("Writing to a partition that is blacklisted") ErrMalformedMessage = errors.New("Producer object not found in message") ErrNullKey = errors.New("Null Key for a HashPartitioner") MaxBufferTime = 100 * time.Millisecond MaxBufferedBytes = 10485760 //10MB //Zk path created by each consumer to indicate that the partition is in use //This path is in the form "/msgq/<service_name>/<topic>/consumer/partitions/in_use/<index>" ConsumerPartitionZkPath = "/msgq/%s/%s/consumer/partitions/inuse" //Zk path created by each consumer to indicate the offset until which the messages //have been processed. This is also referred as CommitOffset //This path is in the form "/msgq/<service_name>/<topic>/consumer/partitions/offsetinfo/<index>" ConsumerOffsetInfoZkPath = "/msgq/%s/%s/consumer/partitions/offsetinfo/%d" //Zk path for the lock used by consumers while getting a partition //This path is in the form "/msgq/<service_name>/<topic>/consumer/partitions/lock" ConsumerLockZkPath = "/msgq/%s/%s/consumer/partitions/lock" //Zk path where the children indicate the list of partitions which should //not be used by a consumer. //This path is in the form "/msgq/<service_name>/<topic>/consumer/partitions/blacklist" ConsumerPartitionsExcludeListZkPath = "/msgq/%s/%s/consumer/partitions/blacklist" //Zk path where the children indicate the list of patition numbers that //have been 'black listed' - i.e. these are the partitions to which the //producer will not write-to //This path is in the form "/msgq/<topic>/producer/partitions/blacklist" ProducerPartitionsExcludeListZkPath = "/msgq/%s/producer/partitions/blacklist" )
var WaitForKafkaAndZkServers = config.WaitForKafkaAndZkServers
A global variable used to assist the test programs to disable the wait for Kafka and Zookeeper servers in the Init function.
Functions ¶
func CommitOffset ¶
Commit commits an offset to a service/topic/partition
func IsTopicAvailMonFeatureEnabled ¶
func IsTopicAvailMonFeatureEnabled() bool
Types ¶
type Consumer ¶
type Consumer struct {
SaramaConsumer sarama.PartitionConsumer
BaseConsumer sarama.Consumer
TopicName string
PartitionNum int32
OffsetData offsetData
CommitOffset int64
ReceiveChan chan *ConsumerEvent
ZkCon *zklib.ZK
PartitionZkPath string
OffsetInfoZkPath string
LockZkPath string
ExcludeListZkPath string
NumMsgsReceivedCounter *stats.Counter // Counter that store actual value of partition of a topic
NumMsgsCommitedCounter *stats.Counter // Counter that store actual number of messages that are ACKed/processed
NumRecvErrCounter *stats.Counter // Counter that store number of messages that got error
ErrOffsetNotFound *stats.Counter // Counter for offset not found in the offset List/Map
OffsetKafka *stats.Set
OffsetZkp *stats.Set
NumZkCommitFailCounter *stats.Counter // Counter that store number of messages that got error while commiting ack to zookeeper
Encryption bool
ReqForClose int32
// contains filtered or unexported fields
}
func (*Consumer) GetOffset ¶
this API will get Ack offset in the zookeeper based on the ack the purpose of setting this AckOffset is to restart the library from given offset
func (*Consumer) GetPartitionNum ¶
func (*Consumer) OffsetCommitRoutine ¶
func (cons *Consumer) OffsetCommitRoutine()
func (*Consumer) ProcessReceivedError ¶
func (cons *Consumer) ProcessReceivedError(ok bool, msg *sarama.ConsumerError) bool
func (*Consumer) ProcessReceivedMessage ¶
func (cons *Consumer) ProcessReceivedMessage(ok bool, msg *sarama.ConsumerMessage) bool
func (*Consumer) Receive ¶
func (cons *Consumer) Receive() chan *ConsumerEvent
func (*Consumer) ReceiveRoutine ¶
func (cons *Consumer) ReceiveRoutine()
This routine waits for the errors or messages from Sarama and writes the same to a channel that the applications must read from. When Sarama encounters Invalid offset error, Sarama closes the message and error channels which will be handled by this routine to restart the consumer with Oldest known offset. The function exits with a panic if it fails to create a new consumer with oldest offset.
func (*Consumer) SetAckOffset ¶
Application marks the message as processed by removing it from the List
func (*Consumer) UpdateOffsetStats ¶
type ConsumerConfig ¶
type ConsumerEvent ¶
Event that will be returned to the consumer
type MessageQueue ¶
type MessageQueue interface {
NewProducer(prodConf *ProducerConfig) (MsgQProducer, error)
NewProducerWithRetry(prodConf *ProducerConfig) (MsgQProducer, error)
NewConsumer(consConf *ConsumerConfig) (MsgQConsumer, error)
NewConsumerWithRetry(consConf *ConsumerConfig) (MsgQConsumer, error)
Partitions(topicName string) (int, error)
ReceiveTopicAvailMonEvt() chan *TopicAvailEvent
}
func NewMsgQ ¶
func NewMsgQ(svcName string, kafkaBrokers []string, zkBrokers []string) (MessageQueue, error)
type MsgQ ¶
type MsgQ struct {
ServiceName string
Client sarama.Client
Zk *zklib.ZK
Producers []*Producer
Consumers []Consumer
KafkaBroker string
KafkaBrokerMutex sync.Mutex
ZkBroker string
ZkBrokerMutex sync.Mutex
TopicAvailChan chan *TopicAvailEvent
}
MsgQ holds pointers to the kafka client and producer, consumer
func (*MsgQ) NewConsumer ¶
func (msgQ *MsgQ) NewConsumer(consConf *ConsumerConfig) (MsgQConsumer, error)
func (*MsgQ) NewConsumerWithRetry ¶
func (msgQ *MsgQ) NewConsumerWithRetry(consConf *ConsumerConfig) (MsgQConsumer, error)
NewConsumerWithRetry creates a msgq consumer. In case of failures, it performs infinite retries with an exponential back-off mechanism for the retry interval till the operation succeeds
func (*MsgQ) NewProducer ¶
func (msgQ *MsgQ) NewProducer(prodConf *ProducerConfig) (MsgQProducer, error)
func (*MsgQ) NewProducerWithRetry ¶
func (msgQ *MsgQ) NewProducerWithRetry(prodConf *ProducerConfig) (MsgQProducer, error)
NewProducerWithRetry creates a msgq producer. In case of failures, it performs infinite retries with an exponential back-off mechanism for the retry interval till the operation succeeds
func (*MsgQ) ReceiveTopicAvailMonEvt ¶
func (msgQ *MsgQ) ReceiveTopicAvailMonEvt() chan *TopicAvailEvent
type MsgQConsumer ¶
type MsgQConsumer interface {
Receive() chan *ConsumerEvent
SetAckOffset(ofs int64) error
GetPartitionNum() int32
Close() error
}
type MsgQProducer ¶
type PartitionerType ¶
type PartitionerType int
const ( RoundRobinPartitioner PartitionerType = iota //The messages are sent to the partitions in round robin fashion HashPartitioner //32bit hash sum % num_Partitions is used to compute the partition number to which the message must be sent to RandomPartitioner //A random generated value % num_partitions is used to compute the partition index ManualPartitioner //With manual partitioner, the application must use Producer.SendToPartition API. The message will be sent to the partition that the application specifies. )
type Producer ¶
type Producer struct {
SaramaProducer sarama.AsyncProducer
TopicName string
Partitioner PartitionerType
ReceiveChan chan *ProducerEvent
NotifyError bool
ZkCon *zklib.ZK
NumPartitionsZkPath string
ExcludeListZkPath string
ExcludeListPartitions []int
ExcludeListPartitionChannel <-chan zk.Event
PartitionCounters []*stats.Counter // Counter that store actual value of partition of a topic
NumMsgSentCounter *stats.Counter // Counter that store number of messages that are sent from application
NumMsgErrCounter *stats.Counter // Counter that store number of messages that got error from application
AvgLatency *stats.Set // value that store average latency of message
AvgBatchNwLatency *stats.Set // value that store average latency of network round trip transmission delay of batch
NonceCounter uint64
Encryption bool
PartitionSt []PartitionStatus
// contains filtered or unexported fields
}
func GetProducerRef ¶
func GetProducerRef(msg *sarama.ProducerMessage) *Producer
func GetProducerRefByTopic ¶
func (*Producer) CheckPartitionBlackListed ¶
func (*Producer) GetAffectedPartList ¶
func (*Producer) GetPartitionStatus ¶
func (prod *Producer) GetPartitionStatus(partition int32) PartitionStatus
func (*Producer) GetTopicStatus ¶
func (prod *Producer) GetTopicStatus() PartitionStatus
func (*Producer) ReceiveErrors ¶
func (prod *Producer) ReceiveErrors() chan *ProducerEvent
func (*Producer) SendKeyMsg ¶
func (*Producer) SendToPartition ¶
This function could be used to send a message to a given partition. However,
this works only when the partitioner type is ManualPartitioner
func (*Producer) SetPartitionStatus ¶
func (prod *Producer) SetPartitionStatus(partition int32, newStatus PartitionStatus) []PartitionStatus
func (*Producer) WatchForErrors ¶
func (prod *Producer) WatchForErrors()
func (*Producer) WatchForSuccess ¶
func (prod *Producer) WatchForSuccess()
func (*Producer) ZkPathMonitorRoutine ¶
func (prod *Producer) ZkPathMonitorRoutine()
type ProducerConfig ¶
type ProducerConfig struct {
TopicName string
Partitioner PartitionerType
NotifyError bool
Encryption bool
}
type ProducerEvent ¶
type ProducerEvent struct {
Msg *sarama.ProducerMessage
Err error
}
Event that will be returned to the producer
type TopicAvailEvent ¶
type TopicAvailEvent struct {
TopicName string
EventType AppEventT // Event type is UP or DOWN
PartStatus []PartitionStatus
}
Event that will be returned to the producer
Directories
¶
| Path | Synopsis |
|---|---|
|
examples
|
|
|
consumerApp
command
|
|
|
producerApp
command
|
|
|
receive
command
|
|
|
send
command
|