msgq

package
v0.0.0-...-360c817 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 2, 2018 License: Apache-2.0 Imports: 26 Imported by: 5

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrAllPartitionUsed     = errors.New("msgq: All partitions used by all the msgq instances ")
	ErrNoConfigSpecfied     = errors.New("msgq: No message library configuration is provided")
	ErrNoMetadata           = errors.New("No Metadata")
	ErrInvalidPartitioner   = errors.New("Invalid partition handler")
	ErrInvalidPartition     = errors.New("Invalid value for a partition number")
	ErrExcludeListPartition = errors.New("Writing to a partition that is blacklisted")
	ErrMalformedMessage     = errors.New("Producer object not found in message")
	ErrNullKey              = errors.New("Null Key for a HashPartitioner")

	MaxBufferTime    = 100 * time.Millisecond
	MaxBufferedBytes = 10485760 //10MB

	//Zk path created by each consumer to indicate that the partition is in use
	//This path is in the form "/msgq/<service_name>/<topic>/consumer/partitions/in_use/<index>"
	ConsumerPartitionZkPath = "/msgq/%s/%s/consumer/partitions/inuse"

	//Zk path created by each consumer to indicate the offset until which the messages
	//have been processed. This is also referred as CommitOffset
	//This path is in the form "/msgq/<service_name>/<topic>/consumer/partitions/offsetinfo/<index>"
	ConsumerOffsetInfoZkPath = "/msgq/%s/%s/consumer/partitions/offsetinfo/%d"

	//Zk path for the lock used by consumers while getting a partition
	//This path is in the form "/msgq/<service_name>/<topic>/consumer/partitions/lock"
	ConsumerLockZkPath = "/msgq/%s/%s/consumer/partitions/lock"

	//Zk path where the children indicate the list of partitions which should
	//not be used by a consumer.
	//This path is in the form "/msgq/<service_name>/<topic>/consumer/partitions/blacklist"
	ConsumerPartitionsExcludeListZkPath = "/msgq/%s/%s/consumer/partitions/blacklist"

	//Zk path where the children indicate the list of patition numbers that
	//have been 'black listed' - i.e. these are the partitions to which the
	//producer will not write-to
	//This path is in the form "/msgq/<topic>/producer/partitions/blacklist"
	ProducerPartitionsExcludeListZkPath = "/msgq/%s/producer/partitions/blacklist"
)
View Source
var WaitForKafkaAndZkServers = config.WaitForKafkaAndZkServers

A global variable used to assist the test programs to disable the wait for Kafka and Zookeeper servers in the Init function.

Functions

func CommitOffset

func CommitOffset(Zk *zklib.ZK, node string, offset int64) (err error)

Commit commits an offset to a service/topic/partition

func IsTopicAvailMonFeatureEnabled

func IsTopicAvailMonFeatureEnabled() bool

Types

type AppEventT

type AppEventT int // TODO: Check for type aliasing
const (
	NOTIF_UP AppEventT = iota
	NOTIF_DOWN
	NOTIF_WARN
)

type ConnStatus

type ConnStatus int
const (
	OK ConnStatus = iota
	BROKEN
	WARNING
)

type Consumer

type Consumer struct {
	SaramaConsumer         sarama.PartitionConsumer
	BaseConsumer           sarama.Consumer
	TopicName              string
	PartitionNum           int32
	OffsetData             offsetData
	CommitOffset           int64
	ReceiveChan            chan *ConsumerEvent
	ZkCon                  *zklib.ZK
	PartitionZkPath        string
	OffsetInfoZkPath       string
	LockZkPath             string
	ExcludeListZkPath      string
	NumMsgsReceivedCounter *stats.Counter // Counter that store actual value of partition of a topic
	NumMsgsCommitedCounter *stats.Counter // Counter that store actual number of messages that are ACKed/processed
	NumRecvErrCounter      *stats.Counter // Counter that store number of messages that got error
	ErrOffsetNotFound      *stats.Counter // Counter for offset not found in the offset List/Map
	OffsetKafka            *stats.Set
	OffsetZkp              *stats.Set
	NumZkCommitFailCounter *stats.Counter // Counter that store number of messages that got error while commiting ack to zookeeper
	Encryption             bool
	ReqForClose            int32
	// contains filtered or unexported fields
}

func (*Consumer) Close

func (cons *Consumer) Close() error

func (*Consumer) DecryptMsg

func (cons *Consumer) DecryptMsg(msg []byte) ([]byte, error)

func (*Consumer) GetOffset

func (cons *Consumer) GetOffset() (offset int64, err error)

this API will get Ack offset in the zookeeper based on the ack the purpose of setting this AckOffset is to restart the library from given offset

func (*Consumer) GetPartitionNum

func (cons *Consumer) GetPartitionNum() int32

func (*Consumer) OffsetCommitRoutine

func (cons *Consumer) OffsetCommitRoutine()

func (*Consumer) ProcessReceivedError

func (cons *Consumer) ProcessReceivedError(ok bool, msg *sarama.ConsumerError) bool

func (*Consumer) ProcessReceivedMessage

func (cons *Consumer) ProcessReceivedMessage(ok bool, msg *sarama.ConsumerMessage) bool

func (*Consumer) Receive

func (cons *Consumer) Receive() chan *ConsumerEvent

func (*Consumer) ReceiveRoutine

func (cons *Consumer) ReceiveRoutine()

This routine waits for the errors or messages from Sarama and writes the same to a channel that the applications must read from. When Sarama encounters Invalid offset error, Sarama closes the message and error channels which will be handled by this routine to restart the consumer with Oldest known offset. The function exits with a panic if it fails to create a new consumer with oldest offset.

func (*Consumer) SetAckOffset

func (cons *Consumer) SetAckOffset(offset int64) (err error)

Application marks the message as processed by removing it from the List

func (*Consumer) UpdateOffsetStats

func (cons *Consumer) UpdateOffsetStats(interval int)

type ConsumerConfig

type ConsumerConfig struct {
	TopicName  string
	Encryption bool
}

type ConsumerEvent

type ConsumerEvent struct {
	Key, Value []byte
	Topic      string
	Partition  int32
	Offset     int64
	Err        error
}

Event that will be returned to the consumer

type MessageQueue

type MessageQueue interface {
	NewProducer(prodConf *ProducerConfig) (MsgQProducer, error)
	NewProducerWithRetry(prodConf *ProducerConfig) (MsgQProducer, error)
	NewConsumer(consConf *ConsumerConfig) (MsgQConsumer, error)
	NewConsumerWithRetry(consConf *ConsumerConfig) (MsgQConsumer, error)
	Partitions(topicName string) (int, error)
	ReceiveTopicAvailMonEvt() chan *TopicAvailEvent
}

func NewMsgQ

func NewMsgQ(svcName string, kafkaBrokers []string, zkBrokers []string) (MessageQueue, error)

func NewMsgQWithStatsOptional

func NewMsgQWithStatsOptional(svcName string, kafkaBrokers []string, zkBrokers []string, disableStats bool, cacertfiles []string, bSkipVerify bool) (MessageQueue, error)

type MsgQ

type MsgQ struct {
	ServiceName      string
	Client           sarama.Client
	Zk               *zklib.ZK
	Producers        []*Producer
	Consumers        []Consumer
	KafkaBroker      string
	KafkaBrokerMutex sync.Mutex
	ZkBroker         string
	ZkBrokerMutex    sync.Mutex
	TopicAvailChan   chan *TopicAvailEvent
}

MsgQ holds pointers to the kafka client and producer, consumer

func (*MsgQ) NewConsumer

func (msgQ *MsgQ) NewConsumer(consConf *ConsumerConfig) (MsgQConsumer, error)

func (*MsgQ) NewConsumerWithRetry

func (msgQ *MsgQ) NewConsumerWithRetry(consConf *ConsumerConfig) (MsgQConsumer, error)

NewConsumerWithRetry creates a msgq consumer. In case of failures, it performs infinite retries with an exponential back-off mechanism for the retry interval till the operation succeeds

func (*MsgQ) NewProducer

func (msgQ *MsgQ) NewProducer(prodConf *ProducerConfig) (MsgQProducer, error)

func (*MsgQ) NewProducerWithRetry

func (msgQ *MsgQ) NewProducerWithRetry(prodConf *ProducerConfig) (MsgQProducer, error)

NewProducerWithRetry creates a msgq producer. In case of failures, it performs infinite retries with an exponential back-off mechanism for the retry interval till the operation succeeds

func (*MsgQ) Partitions

func (msgQ *MsgQ) Partitions(topicName string) (int, error)

func (*MsgQ) ReceiveTopicAvailMonEvt

func (msgQ *MsgQ) ReceiveTopicAvailMonEvt() chan *TopicAvailEvent

func (*MsgQ) Shutdown

func (msgQ *MsgQ) Shutdown() error

type MsgQConsumer

type MsgQConsumer interface {
	Receive() chan *ConsumerEvent
	SetAckOffset(ofs int64) error
	GetPartitionNum() int32
	Close() error
}

type MsgQProducer

type MsgQProducer interface {
	Send(msg []byte) error
	SendMsg(msg *sarama.ProducerMessage) error
	SendKeyMsg(key, msg []byte) error
	SendToPartition(Partition int32, msg []byte) error
	ReceiveErrors() chan *ProducerEvent
	Close() error
}

type PartitionStatus

type PartitionStatus int
const (
	UP PartitionStatus = iota
	DOWN
	WARN
)

type PartitionerType

type PartitionerType int
const (
	RoundRobinPartitioner PartitionerType = iota //The messages are sent to the partitions in round robin fashion
	HashPartitioner                              //32bit hash sum % num_Partitions is used to compute the partition number to which the message must be sent to
	RandomPartitioner                            //A random generated value % num_partitions is used to compute the partition index
	ManualPartitioner                            //With manual partitioner, the application must use Producer.SendToPartition API. The message will be sent to the partition that the application specifies.
)

type Producer

type Producer struct {
	SaramaProducer              sarama.AsyncProducer
	TopicName                   string
	Partitioner                 PartitionerType
	ReceiveChan                 chan *ProducerEvent
	NotifyError                 bool
	ZkCon                       *zklib.ZK
	NumPartitionsZkPath         string
	ExcludeListZkPath           string
	ExcludeListPartitions       []int
	ExcludeListPartitionChannel <-chan zk.Event
	PartitionCounters           []*stats.Counter // Counter that store actual value of partition of a topic
	NumMsgSentCounter           *stats.Counter   // Counter that store number of messages that are sent from application
	NumMsgErrCounter            *stats.Counter   // Counter that store number of messages that got error from application
	AvgLatency                  *stats.Set       // value that store average latency of message
	AvgBatchNwLatency           *stats.Set       // value that store average latency of network round trip transmission delay of batch
	NonceCounter                uint64
	Encryption                  bool

	PartitionSt []PartitionStatus
	// contains filtered or unexported fields
}

func GetProducerRef

func GetProducerRef(msg *sarama.ProducerMessage) *Producer

func GetProducerRefByTopic

func GetProducerRefByTopic(prodList []*Producer, topic string) *Producer

func (*Producer) CheckPartitionBlackListed

func (prod *Producer) CheckPartitionBlackListed(partition int32) bool

func (*Producer) Close

func (prod *Producer) Close() error

func (*Producer) EncryptMsg

func (prod *Producer) EncryptMsg(msg []byte) ([]byte, error)

func (*Producer) GetAffectedPartList

func (prod *Producer) GetAffectedPartList(partition int32) []int32

func (*Producer) GetPartitionStatus

func (prod *Producer) GetPartitionStatus(partition int32) PartitionStatus

func (*Producer) GetTopicStatus

func (prod *Producer) GetTopicStatus() PartitionStatus

func (*Producer) ReceiveErrors

func (prod *Producer) ReceiveErrors() chan *ProducerEvent

func (*Producer) Send

func (prod *Producer) Send(msg []byte) error

func (*Producer) SendKeyMsg

func (prod *Producer) SendKeyMsg(key, msg []byte) error

func (*Producer) SendMsg

func (prod *Producer) SendMsg(msg *sarama.ProducerMessage) error

func (*Producer) SendToPartition

func (prod *Producer) SendToPartition(Partition int32, msg []byte) error

This function could be used to send a message to a given partition. However,

this works only when the partitioner type is ManualPartitioner

func (*Producer) SetPartitionStatus

func (prod *Producer) SetPartitionStatus(partition int32, newStatus PartitionStatus) []PartitionStatus

func (*Producer) WatchForErrors

func (prod *Producer) WatchForErrors()

func (*Producer) WatchForSuccess

func (prod *Producer) WatchForSuccess()

func (*Producer) ZkPathMonitorRoutine

func (prod *Producer) ZkPathMonitorRoutine()

type ProducerConfig

type ProducerConfig struct {
	TopicName   string
	Partitioner PartitionerType
	NotifyError bool
	Encryption  bool
}

type ProducerEvent

type ProducerEvent struct {
	Msg *sarama.ProducerMessage
	Err error
}

Event that will be returned to the producer

type TopicAvailEvent

type TopicAvailEvent struct {
	TopicName  string
	EventType  AppEventT // Event type is UP or DOWN
	PartStatus []PartitionStatus
}

Event that will be returned to the producer

Directories

Path Synopsis
examples
consumerApp command
producerApp command
receive command
send command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL