Documentation
¶
Index ¶
- func Consume(in <-chan *MetadataChanges, ctx *globals) <-chan *sarama.ConsumerMessage
- func Diff(in <-chan *Server, ctx *globals) <-chan *MetadataChanges
- func FetchOffsets(ctx *globals, partition *PartitionInfo) int64
- func MarkOffsets(ctx *globals, offset int64, topic string, partition int32)
- func Refresh(in <-chan time.Time, ctx *globals) <-chan *Server
- func Sync(cfg *Config) <-chan *sarama.ConsumerMessage
- func TrackOffsets(in <-chan *sarama.ConsumerMessage, ctx *globals) <-chan *sarama.ConsumerMessage
- type Config
- type KafkaPartition
- type MetadataChanges
- type PartitionInfo
- type Server
- type StdLogger
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Consume ¶
func Consume(in <-chan *MetadataChanges, ctx *globals) <-chan *sarama.ConsumerMessage
func Diff ¶
func Diff(in <-chan *Server, ctx *globals) <-chan *MetadataChanges
func FetchOffsets ¶
func FetchOffsets(ctx *globals, partition *PartitionInfo) int64
func MarkOffsets ¶
func TrackOffsets ¶
func TrackOffsets(in <-chan *sarama.ConsumerMessage, ctx *globals) <-chan *sarama.ConsumerMessage
Types ¶
type Config ¶
type Config struct { *sarama.Config // Local broker URL to connect to BrokerUrl string // How often refresh metadata about topics in ms MetadataRefreshMs int64 // Optional. Broker advertised url to figure out local broker ID // if not provided, will be attempted to auto discovery based on existsting network // interfaces BrokerAdvertisedUrl string // Optional. Whether to commit & track consumed partitions offsets. When set to 'true', // will periodically commit position back to kafka __consumer_offsets topic. // Default is 'false'. AutotrackOffsets bool // Optional. consumer group name when AutotrackOffsets set to true ConsumerGroup string // Optional. Callback to retrive partition offset information if stored outside of Kafka // When AutotrackOffsets if used, the value is ignored and kafka based offset management is used // Default is no offset tracking, will re-consume all matching topics FetchOffsets func(partition *PartitionInfo) int64 // contains filtered or unexported fields }
type KafkaPartition ¶
type KafkaPartition struct { Topic string Metadata *sarama.PartitionMetadata }
type MetadataChanges ¶
type MetadataChanges struct { Added []PartitionInfo Removed []PartitionInfo }
type PartitionInfo ¶
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
func DiscoverServer ¶
func RefreshMetadata ¶
func RefreshMetadata(ctx *globals) *Server
func (*Server) Partitions ¶
func (server *Server) Partitions() []KafkaPartition
Click to show internal directories.
Click to hide internal directories.