Documentation ¶
Index ¶
- func InitLogger(log logger.Logger)
- func ToProducerMessage(message *ProducerMessage) (msg *sarama.ProducerMessage)
- type ConsumerMessage
- type Handler
- type ProducerError
- type ProducerMessage
- type Reader
- type ReaderOpt
- type ReaderOpts
- type RecordHeader
- type RequiredAck
- type StartOffset
- type Writer
- type WriterOpt
- type WriterOpts
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func InitLogger ¶
func ToProducerMessage ¶
func ToProducerMessage(message *ProducerMessage) (msg *sarama.ProducerMessage)
Types ¶
type ConsumerMessage ¶
type ConsumerMessage struct {
Key, Value []byte
Topic string
Partition int32
Offset int64
Headers []*RecordHeader // only set if kafka is version 0.11+
Timestamp time.Time // only set if kafka is version 0.10+, inner message timestamp
BlockTimestamp time.Time // only set if kafka is version 0.10+, outer (compressed) block timestamp
}
ConsumerMessage encapsulates a Kafka message returned by the consumer.
type Handler ¶
type Handler func(ctx context.Context, session sarama.ConsumerGroupSession, message *ConsumerMessage) error
type ProducerError ¶
type ProducerError struct { Msg *ProducerMessage Err error }
ProducerError is the type of error generated when the producer fails to deliver a message. It contains the original ProducerMessage as well as the actual error value.
type ProducerMessage ¶
type ProducerMessage struct { Topic string // The Kafka topic for this message. // The partitioning key for this message. Pre-existing Encoders include // StringEncoder and ByteEncoder. Key string // The actual message to store in Kafka. Pre-existing Encoders include // StringEncoder and ByteEncoder. Value []byte // This field is used to hold arbitrary data you wish to include so it // will be available when receiving on the Successes and Errors channels. // Sarama completely ignores this field and is only to be used for // pass-through data. Metadata interface{} // Offset is the offset of the message stored on the broker. This is only // guaranteed to be defined if the message was successfully delivered and // RequiredAcks is not NoResponse. Offset int64 // Partition is the partition that the message was sent to. This is only // guaranteed to be defined if the message was successfully delivered. Partition int32 // Timestamp is the timestamp assigned to the message by the broker. This // is only guaranteed to be defined if the message was successfully // delivered, RequiredAcks is not NoResponse, and the Kafka broker is at // least version 0.10.0. Timestamp time.Time // MessageID MessageID string }
type Reader ¶
type ReaderOpt ¶
type ReaderOpt func(o *ReaderOpts)
func ReaderCommitInterval ¶
func ReaderLogger ¶
func ReaderServiceName ¶
func ReaderStartOffset ¶
func ReaderStartOffset(offset StartOffset) ReaderOpt
type ReaderOpts ¶
type ReaderOpts struct { ServiceName string // The list of broker addresses used to connect to the kafka cluster. Brokers []string // The topic to read messages from. Topic string // GroupID holds the optional consumer group id. If GroupID is specified, then // Partition should NOT be specified e.g. 0 GroupID string // StartOffset determines from whence the consumer group should begin // consuming when it finds a partition without a committed offset. If // non-zero, it must be set to one of FirstOffset or LastOffset. // // Default: FirstOffset // // Only used when Group is set StartOffset StartOffset CommitInterval int Logger logger.Logger }
type RecordHeader ¶
RecordHeader stores key and value for a record header
type RequiredAck ¶
type RequiredAck int16
const ( // NoResponse doesn't send any response, the TCP ACK is all you get. NoResponse RequiredAck = 0 // WaitForLocal waits for only the local commit to succeed before responding. WaitForLocal RequiredAck = 1 // WaitForAll waits for all in-sync replicas to commit before responding. // The minimum number of in-sync replicas is configured on the broker via // the `min.insync.replicas` configuration key. WaitForAll RequiredAck = -1 )
type StartOffset ¶
type StartOffset int
const ( // OffsetNewest stands for the log head offset, i.e. the offset that will be // assigned to the next message that will be produced to the partition. You // can send this to a client's GetOffset method to get this offset, or when // calling ConsumePartition to start consuming new messages. OffsetNewest StartOffset = -1 // OffsetOldest stands for the oldest offset available on the broker for a // partition. You can send this to a client's GetOffset method to get this // offset, or when calling ConsumePartition to start consuming from the // oldest offset that is still available on the broker. OffsetOldest StartOffset = -2 )
type Writer ¶
type Writer interface { SendMessage(ctx context.Context, message *ProducerMessage) error Errors() <-chan *ProducerError Messages() <-chan *ProducerMessage Close() (err error) }
type WriterOpt ¶
type WriterOpt func(o *WriterOpts)
func WriterAsync ¶
func WriterLogger ¶
func WriterMaxAttempts ¶
func WriterReadTimeout ¶
func WriterRequiredAck ¶
func WriterRequiredAck(requiredAck RequiredAck) WriterOpt
func WriterServiceName ¶
type WriterOpts ¶
type WriterOpts struct { ServiceName string Brokers []string // Limit on how many attempts will be made to deliver a message. // // The default is to try at most 10 times. MaxAttempts int // Number of acknowledges from partition replicas required before receiving // a response to a produce request. The default is -1, which means to wait for // all replicas, and a value above 0 is required to indicate how many replicas // should acknowledge a message to be considered successful. // // This version of kafka-go (v0.3) does not support 0 required acks, due to // some internal complexity implementing this with the Kafka protocol. If you // need that functionality specifically, you'll need to upgrade to v0.4. RequiredAck RequiredAck ReadTimeout int Async bool Logger logger.Logger }
Source Files ¶
Click to show internal directories.
Click to hide internal directories.