Documentation
¶
Index ¶
- func New(opts ...Option) *kafka.Reader
- type Option
- func WithBrokers(brokers []string) Option
- func WithCommitInterval(commitInterval time.Duration) Option
- func WithDialer(dialer *kafka.Dialer) Option
- func WithErrorLogger(errorLogger kafka.Logger) Option
- func WithGroupBalancers(groupBalancers []kafka.GroupBalancer) Option
- func WithGroupID(groupID string) Option
- func WithGroupTopics(groupTopics []string) Option
- func WithHeartbeatInterval(heartbeatInterval time.Duration) Option
- func WithIsolationLevel(isolationLevel kafka.IsolationLevel) Option
- func WithJoinGroupBackoff(joinGroupBackoff time.Duration) Option
- func WithLogger(logger kafka.Logger) Option
- func WithMaxAttempts(maxAttempts int) Option
- func WithMaxBytes(maxBytes int) Option
- func WithMaxWait(maxWait time.Duration) Option
- func WithMinBytes(minBytes int) Option
- func WithOffsetOutOfRangeError(offsetOutOfRangeError bool) Option
- func WithPartition(partition int) Option
- func WithPartitionWatchInterval(partitionWatchInterval time.Duration) Option
- func WithQueueCapacity(queueCapacity int) Option
- func WithReadBackoffMax(readBackoffMax time.Duration) Option
- func WithReadBackoffMin(readBackoffMin time.Duration) Option
- func WithReadBatchTimeout(readBatchTimeout time.Duration) Option
- func WithReadLagInterval(readLagInterval time.Duration) Option
- func WithRebalanceTimeout(rebalanceTimeout time.Duration) Option
- func WithRetentionTime(retentionTime time.Duration) Option
- func WithSessionTimeout(sessionTimeout time.Duration) Option
- func WithStartOffset(startOffset int64) Option
- func WithTopic(topic string) Option
- func WithWatchPartitionChanges(watchPartitionChanges bool) Option
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type Option ¶
type Option func(*kafka.ReaderConfig)
func WithBrokers ¶
The list of broker addresses used to connect to the kafka cluster.
func WithCommitInterval ¶
CommitInterval indicates the interval at which offsets are committed to the broker. If 0, commits will be handled synchronously.
Default: 0
Only used when GroupID is set
func WithDialer ¶
An dialer used to open connections to the kafka server. This field is optional, if nil, the default dialer is used instead.
func WithErrorLogger ¶
ErrorLogger is the logger used to report errors. If nil, the writer falls back to using Logger instead.
func WithGroupBalancers ¶
func WithGroupBalancers(groupBalancers []kafka.GroupBalancer) Option
GroupBalancers is the priority-ordered list of client-side consumer group balancing strategies that will be offered to the coordinator. The first strategy that all group members support will be chosen by the leader.
Default: [Range, RoundRobin]
Only used when GroupID is set
func WithGroupID ¶
GroupID holds the optional consumer group id. If GroupID is specified, then Partition should NOT be specified e.g. 0
func WithGroupTopics ¶
GroupTopics allows specifying multiple topics, but can only be used in combination with GroupID, as it is a consumer-group feature. As such, if GroupID is set, then either Topic or GroupTopics must be defined.
func WithHeartbeatInterval ¶
HeartbeatInterval sets the optional frequency at which the reader sends the consumer group heartbeat update.
Default: 3s
Only used when GroupID is set
func WithIsolationLevel ¶
func WithIsolationLevel(isolationLevel kafka.IsolationLevel) Option
IsolationLevel controls the visibility of transactional records. ReadUncommitted makes all records visible. With ReadCommitted only non-transactional and committed records are visible.
func WithJoinGroupBackoff ¶
JoinGroupBackoff optionally sets the length of time to wait between re-joining the consumer group after an error.
Default: 5s
func WithLogger ¶
If not nil, specifies a logger used to report internal changes within the writer.
func WithMaxAttempts ¶
Limit of how many attempts to connect will be made before returning the error.
The default is to try 3 times.
func WithMaxBytes ¶
MaxBytes indicates to the broker the maximum batch size that the consumer will accept. The broker will truncate a message to satisfy this maximum, so choose a value that is high enough for your largest message size.
Default: 1MB
func WithMaxWait ¶
Maximum amount of time to wait for new data to come when fetching batches of messages from kafka.
Default: 10s
func WithMinBytes ¶
MinBytes indicates to the broker the minimum batch size that the consumer will accept. Setting a high minimum when consuming from a low-volume topic may result in delayed delivery when the broker does not have enough data to satisfy the defined minimum.
Default: 1
func WithOffsetOutOfRangeError ¶
OffsetOutOfRangeError indicates that the reader should return an error in the event of an OffsetOutOfRange error, rather than retrying indefinitely. This flag is being added to retain backwards-compatibility, so it will be removed in a future version of kafka-go.
func WithPartition ¶
Partition to read messages from. Either Partition or GroupID may be assigned, but not both
func WithPartitionWatchInterval ¶
PartitionWatchInterval indicates how often a reader checks for partition changes. If a reader sees a partition change (such as a partition add) it will rebalance the group picking up new partitions.
Default: 5s
Only used when GroupID is set and WatchPartitionChanges is set.
func WithQueueCapacity ¶
The capacity of the internal message queue, defaults to 100 if none is set.
func WithReadBackoffMax ¶
BackoffDelayMax optionally sets the maximum amount of time the reader will wait before polling for new messages
Default: 1s
func WithReadBackoffMin ¶
BackoffDelayMin optionally sets the smallest amount of time the reader will wait before polling for new messages
Default: 100ms
func WithReadBatchTimeout ¶
ReadBatchTimeout amount of time to wait to fetch message from kafka messages batch.
Default: 10s
func WithReadLagInterval ¶
ReadLagInterval sets the frequency at which the reader lag is updated. Setting this field to a negative value disables lag reporting.
func WithRebalanceTimeout ¶
RebalanceTimeout optionally sets the length of time the coordinator will wait for members to join as part of a rebalance. For kafka servers under higher load, it may be useful to set this value higher.
Default: 30s
Only used when GroupID is set
func WithRetentionTime ¶
RetentionTime optionally sets the length of time the consumer group will be saved by the broker. -1 will disable the setting and leave the retention up to the broker's offsets.retention.minutes property. By default, that setting is 1 day for kafka < 2.0 and 7 days for kafka >= 2.0.
Default: -1
Only used when GroupID is set
func WithSessionTimeout ¶
SessionTimeout optionally sets the length of time that may pass without a heartbeat before the coordinator considers the consumer dead and initiates a rebalance.
Default: 30s
Only used when GroupID is set
func WithStartOffset ¶
StartOffset determines from whence the consumer group should begin consuming when it finds a partition without a committed offset. If non-zero, it must be set to one of FirstOffset or LastOffset.
Default: FirstOffset
Only used when GroupID is set
func WithWatchPartitionChanges ¶
WatchForPartitionChanges is used to inform kafka-go that a consumer group should be polling the brokers and rebalancing if any partition changes happen to the topic.