Documentation ¶
Index ¶
- type ConsumerGroupClaim
- type ConsumerGroupSession
- func (m *ConsumerGroupSession) Claims() map[string][]int32
- func (m *ConsumerGroupSession) Context() context.Context
- func (m *ConsumerGroupSession) GenerationID() int32
- func (m *ConsumerGroupSession) MarkMessage(msg *sarama.ConsumerMessage, metadata string)
- func (m *ConsumerGroupSession) MarkOffset(topic string, partition int32, offset int64, metadata string)
- func (m *ConsumerGroupSession) MemberID() string
- func (m *ConsumerGroupSession) ResetOffset(topic string, partition int32, offset int64, metadata string)
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ConsumerGroupClaim ¶
ConsumerGroupClaim processes Kafka messages from a given topic and partition within a consumer group.
func (*ConsumerGroupClaim) HighWaterMarkOffset ¶
func (m *ConsumerGroupClaim) HighWaterMarkOffset() int64
HighWaterMarkOffset returns the high water mark offset of the partition, i.e. the offset that will be used for the next message that will be produced. You can use this to determine how far behind the processing is.
func (*ConsumerGroupClaim) InitialOffset ¶
func (m *ConsumerGroupClaim) InitialOffset() int64
InitialOffset returns the initial offset that was used as a starting point for this claim.
func (*ConsumerGroupClaim) Messages ¶
func (m *ConsumerGroupClaim) Messages() <-chan *sarama.ConsumerMessage
Messages returns the read channel for the messages that are returned by the broker. The messages channel will be closed when a new rebalance cycle is due. You must finish processing and mark offsets within Config.Consumer.Group.Session.Timeout before the topic/partition is eventually re-assigned to another group member.
func (*ConsumerGroupClaim) Partition ¶
func (m *ConsumerGroupClaim) Partition() int32
Partition returns the consumed partition.
func (*ConsumerGroupClaim) Topic ¶
func (m *ConsumerGroupClaim) Topic() string
Topic returns the consumed topic name.
type ConsumerGroupSession ¶
ConsumerGroupSession represents a consumer group member session.
func (*ConsumerGroupSession) Claims ¶
func (m *ConsumerGroupSession) Claims() map[string][]int32
Claims returns information about the claimed partitions by topic.
func (*ConsumerGroupSession) Context ¶
func (m *ConsumerGroupSession) Context() context.Context
Context returns the session context.
func (*ConsumerGroupSession) GenerationID ¶
func (m *ConsumerGroupSession) GenerationID() int32
GenerationID returns the current generation ID.
func (*ConsumerGroupSession) MarkMessage ¶
func (m *ConsumerGroupSession) MarkMessage(msg *sarama.ConsumerMessage, metadata string)
MarkMessage marks a message as consumed.
func (*ConsumerGroupSession) MarkOffset ¶
func (m *ConsumerGroupSession) MarkOffset(topic string, partition int32, offset int64, metadata string)
MarkOffset marks the provided offset, alongside a metadata string that represents the state of the partition consumer at that point in time. The metadata string can be used by another consumer to restore that state, so it can resume consumption.
To follow upstream conventions, you are expected to mark the offset of the next message to read, not the last message read. Thus, when calling `MarkOffset` you should typically add one to the offset of the last consumed message.
Note: calling MarkOffset does not necessarily commit the offset to the backend store immediately for efficiency reasons, and it may never be committed if your application crashes. This means that you may end up processing the same message twice, and your processing should ideally be idempotent.
func (*ConsumerGroupSession) MemberID ¶
func (m *ConsumerGroupSession) MemberID() string
MemberID returns the cluster member ID.
func (*ConsumerGroupSession) ResetOffset ¶
func (m *ConsumerGroupSession) ResetOffset(topic string, partition int32, offset int64, metadata string)
ResetOffset resets to the provided offset, alongside a metadata string that represents the state of the partition consumer at that point in time. Reset acts as a counterpart to MarkOffset, the difference being that it allows to reset an offset to an earlier or smaller value, where MarkOffset only allows incrementing the offset. cf MarkOffset for more details.