README

sarama

Go Reference Build Status Coverage

Sarama is an MIT-licensed Go client library for Apache Kafka version 0.8 (and later).

Getting started

  • API documentation and examples are available via pkg.go.dev.
  • Mocks for testing are available in the mocks subpackage.
  • The examples directory contains more elaborate example applications.
  • The tools directory contains command line tools that can be useful for testing, diagnostics, and instrumentation.

You might also want to look at the Frequently Asked Questions.

Compatibility and API stability

Sarama provides a "2 releases + 2 months" compatibility guarantee: we support the two latest stable releases of Kafka and Go, and we provide a two month grace period for older releases. This means we currently officially support Go 1.14 through 1.15, and Kafka 2.5 through 2.7, although older releases are still likely to work.

Sarama follows semantic versioning and provides API stability via the gopkg.in service. You can import a version with a guaranteed stable API via http://gopkg.in/Shopify/sarama.v1. A changelog is available here.

Contributing

Expand ▾ Collapse ▴

Documentation

Overview

    Package sarama is a pure Go client library for dealing with Apache Kafka (versions 0.8 and later). It includes a high-level API for easily producing and consuming messages, and a low-level API for controlling bytes on the wire when the high-level API is insufficient. Usage examples for the high-level APIs are provided inline with their full documentation.

    To produce messages, use either the AsyncProducer or the SyncProducer. The AsyncProducer accepts messages on a channel and produces them asynchronously in the background as efficiently as possible; it is preferred in most cases. The SyncProducer provides a method which will block until Kafka acknowledges the message as produced. This can be useful but comes with two caveats: it will generally be less efficient, and the actual durability guarantees depend on the configured value of `Producer.RequiredAcks`. There are configurations where a message acknowledged by the SyncProducer can still sometimes be lost.

    To consume messages, use Consumer or Consumer-Group API.

    For lower-level needs, the Broker and Request/Response objects permit precise control over each connection and message sent on the wire; the Client provides higher-level metadata management that is shared between the producers and the consumer. The Request/Response objects and properties are mostly undocumented, as they line up exactly with the protocol fields documented by Kafka at https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol

    Metrics are exposed through https://github.com/rcrowley/go-metrics library in a local registry.

    Broker related metrics:

    +----------------------------------------------+------------+---------------------------------------------------------------+
    | Name                                         | Type       | Description                                                   |
    +----------------------------------------------+------------+---------------------------------------------------------------+
    | incoming-byte-rate                           | meter      | Bytes/second read off all brokers                             |
    | incoming-byte-rate-for-broker-<broker-id>    | meter      | Bytes/second read off a given broker                          |
    | outgoing-byte-rate                           | meter      | Bytes/second written off all brokers                          |
    | outgoing-byte-rate-for-broker-<broker-id>    | meter      | Bytes/second written off a given broker                       |
    | request-rate                                 | meter      | Requests/second sent to all brokers                           |
    | request-rate-for-broker-<broker-id>          | meter      | Requests/second sent to a given broker                        |
    | request-size                                 | histogram  | Distribution of the request size in bytes for all brokers     |
    | request-size-for-broker-<broker-id>          | histogram  | Distribution of the request size in bytes for a given broker  |
    | request-latency-in-ms                        | histogram  | Distribution of the request latency in ms for all brokers     |
    | request-latency-in-ms-for-broker-<broker-id> | histogram  | Distribution of the request latency in ms for a given broker  |
    | response-rate                                | meter      | Responses/second received from all brokers                    |
    | response-rate-for-broker-<broker-id>         | meter      | Responses/second received from a given broker                 |
    | response-size                                | histogram  | Distribution of the response size in bytes for all brokers    |
    | response-size-for-broker-<broker-id>         | histogram  | Distribution of the response size in bytes for a given broker |
    | requests-in-flight                           | counter    | The current number of in-flight requests awaiting a response  |
    |                                              |            | for all brokers                                               |
    | requests-in-flight-for-broker-<broker-id>    | counter    | The current number of in-flight requests awaiting a response  |
    |                                              |            | for a given broker                                            |
    +----------------------------------------------+------------+---------------------------------------------------------------+
    

    Note that we do not gather specific metrics for seed brokers but they are part of the "all brokers" metrics.

    Producer related metrics:

    +-------------------------------------------+------------+--------------------------------------------------------------------------------------+
    | Name                                      | Type       | Description                                                                          |
    +-------------------------------------------+------------+--------------------------------------------------------------------------------------+
    | batch-size                                | histogram  | Distribution of the number of bytes sent per partition per request for all topics    |
    | batch-size-for-topic-<topic>              | histogram  | Distribution of the number of bytes sent per partition per request for a given topic |
    | record-send-rate                          | meter      | Records/second sent to all topics                                                    |
    | record-send-rate-for-topic-<topic>        | meter      | Records/second sent to a given topic                                                 |
    | records-per-request                       | histogram  | Distribution of the number of records sent per request for all topics                |
    | records-per-request-for-topic-<topic>     | histogram  | Distribution of the number of records sent per request for a given topic             |
    | compression-ratio                         | histogram  | Distribution of the compression ratio times 100 of record batches for all topics     |
    | compression-ratio-for-topic-<topic>       | histogram  | Distribution of the compression ratio times 100 of record batches for a given topic  |
    +-------------------------------------------+------------+--------------------------------------------------------------------------------------+
    

    Consumer related metrics:

    +-------------------------------------------+------------+--------------------------------------------------------------------------------------+
    | Name                                      | Type       | Description                                                                          |
    +-------------------------------------------+------------+--------------------------------------------------------------------------------------+
    | consumer-batch-size                       | histogram  | Distribution of the number of messages in a batch                                    |
    +-------------------------------------------+------------+--------------------------------------------------------------------------------------+
    

    Index

    Examples

    Constants

    View Source
    const (
    	// RangeBalanceStrategyName identifies strategies that use the range partition assignment strategy
    	RangeBalanceStrategyName = "range"
    
    	// RoundRobinBalanceStrategyName identifies strategies that use the round-robin partition assignment strategy
    	RoundRobinBalanceStrategyName = "roundrobin"
    
    	// StickyBalanceStrategyName identifies strategies that use the sticky-partition assignment strategy
    	StickyBalanceStrategyName = "sticky"
    )
    View Source
    const (
    	// SASLTypeOAuth represents the SASL/OAUTHBEARER mechanism (Kafka 2.0.0+)
    	SASLTypeOAuth = "OAUTHBEARER"
    	// SASLTypePlaintext represents the SASL/PLAIN mechanism
    	SASLTypePlaintext = "PLAIN"
    	// SASLTypeSCRAMSHA256 represents the SCRAM-SHA-256 mechanism.
    	SASLTypeSCRAMSHA256 = "SCRAM-SHA-256"
    	// SASLTypeSCRAMSHA512 represents the SCRAM-SHA-512 mechanism.
    	SASLTypeSCRAMSHA512 = "SCRAM-SHA-512"
    	SASLTypeGSSAPI      = "GSSAPI"
    	// SASLHandshakeV0 is v0 of the Kafka SASL handshake protocol. Client and
    	// server negotiate SASL auth using opaque packets.
    	SASLHandshakeV0 = int16(0)
    	// SASLHandshakeV1 is v1 of the Kafka SASL handshake protocol. Client and
    	// server negotiate SASL by wrapping tokens with Kafka protocol headers.
    	SASLHandshakeV1 = int16(1)
    	// SASLExtKeyAuth is the reserved extension key name sent as part of the
    	// SASL/OAUTHBEARER initial client response
    	SASLExtKeyAuth = "auth"
    )
    View Source
    const (
    	// OffsetNewest stands for the log head offset, i.e. the offset that will be
    	// assigned to the next message that will be produced to the partition. You
    	// can send this to a client's GetOffset method to get this offset, or when
    	// calling ConsumePartition to start consuming new messages.
    	OffsetNewest int64 = -1
    	// OffsetOldest stands for the oldest offset available on the broker for a
    	// partition. You can send this to a client's GetOffset method to get this
    	// offset, or when calling ConsumePartition to start consuming from the
    	// oldest offset that is still available on the broker.
    	OffsetOldest int64 = -2
    )
    View Source
    const (
    	TOK_ID_KRB_AP_REQ   = 256
    	GSS_API_GENERIC_TAG = 0x60
    	KRB5_USER_AUTH      = 1
    	KRB5_KEYTAB_AUTH    = 2
    	GSS_API_INITIAL     = 1
    	GSS_API_VERIFY      = 2
    	GSS_API_FINISH      = 3
    )
    View Source
    const APIKeySASLAuth = 36

      APIKeySASLAuth is the API key for the SaslAuthenticate Kafka API

      View Source
      const GroupGenerationUndefined = -1

        GroupGenerationUndefined is a special value for the group generation field of Offset Commit Requests that should be used when a consumer group does not rely on Kafka for partition management.

        View Source
        const ReceiveTime int64 = -1

          ReceiveTime is a special value for the timestamp field of Offset Commit Requests which tells the broker to set the timestamp to the time at which the request was received. The timestamp is only used if message version 1 is used, which requires kafka 0.8.2.

          Variables

          View Source
          var (
          	// Logger is the instance of a StdLogger interface that Sarama writes connection
          	// management events to. By default it is set to discard all log messages via ioutil.Discard,
          	// but you can set it to redirect wherever you want.
          	Logger StdLogger = log.New(ioutil.Discard, "[Sarama] ", log.LstdFlags)
          
          	// PanicHandler is called for recovering from panics spawned internally to the library (and thus
          	// not recoverable by the caller's goroutine). Defaults to nil, which means panics are not recovered.
          	PanicHandler func(interface{})
          
          	// MaxRequestSize is the maximum size (in bytes) of any request that Sarama will attempt to send. Trying
          	// to send a request larger than this will result in an PacketEncodingError. The default of 100 MiB is aligned
          	// with Kafka's default `socket.request.max.bytes`, which is the largest request the broker will attempt
          	// to process.
          	MaxRequestSize int32 = 100 * 1024 * 1024
          
          	// MaxResponseSize is the maximum size (in bytes) of any response that Sarama will attempt to parse. If
          	// a broker returns a response message larger than this value, Sarama will return a PacketDecodingError to
          	// protect the client from running out of memory. Please note that brokers do not have any natural limit on
          	// the size of responses they send. In particular, they can send arbitrarily large fetch responses to consumers
          	// (see https://issues.apache.org/jira/browse/KAFKA-2063).
          	MaxResponseSize int32 = 100 * 1024 * 1024
          )
          View Source
          var (
          	V0_8_2_0  = newKafkaVersion(0, 8, 2, 0)
          	V0_8_2_1  = newKafkaVersion(0, 8, 2, 1)
          	V0_8_2_2  = newKafkaVersion(0, 8, 2, 2)
          	V0_9_0_0  = newKafkaVersion(0, 9, 0, 0)
          	V0_9_0_1  = newKafkaVersion(0, 9, 0, 1)
          	V0_10_0_0 = newKafkaVersion(0, 10, 0, 0)
          	V0_10_0_1 = newKafkaVersion(0, 10, 0, 1)
          	V0_10_1_0 = newKafkaVersion(0, 10, 1, 0)
          	V0_10_1_1 = newKafkaVersion(0, 10, 1, 1)
          	V0_10_2_0 = newKafkaVersion(0, 10, 2, 0)
          	V0_10_2_1 = newKafkaVersion(0, 10, 2, 1)
          	V0_11_0_0 = newKafkaVersion(0, 11, 0, 0)
          	V0_11_0_1 = newKafkaVersion(0, 11, 0, 1)
          	V0_11_0_2 = newKafkaVersion(0, 11, 0, 2)
          	V1_0_0_0  = newKafkaVersion(1, 0, 0, 0)
          	V1_1_0_0  = newKafkaVersion(1, 1, 0, 0)
          	V1_1_1_0  = newKafkaVersion(1, 1, 1, 0)
          	V2_0_0_0  = newKafkaVersion(2, 0, 0, 0)
          	V2_0_1_0  = newKafkaVersion(2, 0, 1, 0)
          	V2_1_0_0  = newKafkaVersion(2, 1, 0, 0)
          	V2_2_0_0  = newKafkaVersion(2, 2, 0, 0)
          	V2_3_0_0  = newKafkaVersion(2, 3, 0, 0)
          	V2_4_0_0  = newKafkaVersion(2, 4, 0, 0)
          	V2_5_0_0  = newKafkaVersion(2, 5, 0, 0)
          	V2_6_0_0  = newKafkaVersion(2, 6, 0, 0)
          	V2_7_0_0  = newKafkaVersion(2, 7, 0, 0)
          
          	SupportedVersions = []KafkaVersion{
          		V0_8_2_0,
          		V0_8_2_1,
          		V0_8_2_2,
          		V0_9_0_0,
          		V0_9_0_1,
          		V0_10_0_0,
          		V0_10_0_1,
          		V0_10_1_0,
          		V0_10_1_1,
          		V0_10_2_0,
          		V0_10_2_1,
          		V0_11_0_0,
          		V0_11_0_1,
          		V0_11_0_2,
          		V1_0_0_0,
          		V1_1_0_0,
          		V1_1_1_0,
          		V2_0_0_0,
          		V2_0_1_0,
          		V2_1_0_0,
          		V2_2_0_0,
          		V2_3_0_0,
          		V2_4_0_0,
          		V2_5_0_0,
          		V2_6_0_0,
          		V2_7_0_0,
          	}
          	MinVersion     = V0_8_2_0
          	MaxVersion     = V2_7_0_0
          	DefaultVersion = V1_0_0_0
          )

            Effective constants defining the supported kafka versions.

            View Source
            var BalanceStrategyRange = &balanceStrategy{
            	name: RangeBalanceStrategyName,
            	coreFn: func(plan BalanceStrategyPlan, memberIDs []string, topic string, partitions []int32) {
            		step := float64(len(partitions)) / float64(len(memberIDs))
            
            		for i, memberID := range memberIDs {
            			pos := float64(i)
            			min := int(math.Floor(pos*step + 0.5))
            			max := int(math.Floor((pos+1)*step + 0.5))
            			plan.Add(memberID, topic, partitions[min:max]...)
            		}
            	},
            }

              BalanceStrategyRange is the default and assigns partitions as ranges to consumer group members. Example with one topic T with six partitions (0..5) and two members (M1, M2):

              M1: {T: [0, 1, 2]}
              M2: {T: [3, 4, 5]}
              
              View Source
              var BalanceStrategyRoundRobin = new(roundRobinBalancer)

                BalanceStrategyRoundRobin assigns partitions to members in alternating order. For example, there are two topics (t0, t1) and two consumer (m0, m1), and each topic has three partitions (p0, p1, p2): M0: [t0p0, t0p2, t1p1] M1: [t0p1, t1p0, t1p2]

                View Source
                var BalanceStrategySticky = &stickyBalanceStrategy{}

                  BalanceStrategySticky assigns partitions to members with an attempt to preserve earlier assignments while maintain a balanced partition distribution. Example with topic T with six partitions (0..5) and two members (M1, M2):

                  M1: {T: [0, 2, 4]}
                  M2: {T: [1, 3, 5]}
                  

                  On reassignment with an additional consumer, you might get an assignment plan like:

                  M1: {T: [0, 2]}
                  M2: {T: [1, 3]}
                  M3: {T: [4, 5]}
                  
                  View Source
                  var ErrAlreadyConnected = errors.New("kafka: broker connection already initiated")

                    ErrAlreadyConnected is the error returned when calling Open() on a Broker that is already connected or connecting.

                    View Source
                    var ErrBrokerNotFound = errors.New("kafka: broker for ID is not found")

                      ErrBrokerNotFound is the error returned when there's no broker found for the requested ID.

                      View Source
                      var ErrClosedClient = errors.New("kafka: tried to use a client that was closed")

                        ErrClosedClient is the error returned when a method is called on a client that has been closed.

                        View Source
                        var ErrClosedConsumerGroup = errors.New("kafka: tried to use a consumer group that was closed")

                          ErrClosedConsumerGroup is the error returned when a method is called on a consumer group that has been closed.

                          View Source
                          var ErrConsumerOffsetNotAdvanced = errors.New("kafka: consumer offset was not advanced after a RecordBatch")

                            ErrConsumerOffsetNotAdvanced is returned when a partition consumer didn't advance its offset after parsing a RecordBatch.

                            View Source
                            var ErrControllerNotAvailable = errors.New("kafka: controller is not available")

                              ErrControllerNotAvailable is returned when server didn't give correct controller id. May be kafka server's version is lower than 0.10.0.0.

                              View Source
                              var ErrIncompleteResponse = errors.New("kafka: response did not contain all the expected topic/partition blocks")

                                ErrIncompleteResponse is the error returned when the server returns a syntactically valid response, but it does not contain the expected information.

                                View Source
                                var ErrInsufficientData = errors.New("kafka: insufficient data to decode packet, more bytes expected")

                                  ErrInsufficientData is returned when decoding and the packet is truncated. This can be expected when requesting messages, since as an optimization the server is allowed to return a partial message at the end of the message set.

                                  View Source
                                  var ErrInvalidPartition = errors.New("kafka: partitioner returned an invalid partition index")

                                    ErrInvalidPartition is the error returned when a partitioner returns an invalid partition index (meaning one outside of the range [0...numPartitions-1]).

                                    View Source
                                    var ErrMessageTooLarge = errors.New("kafka: message is larger than Consumer.Fetch.Max")

                                      ErrMessageTooLarge is returned when the next message to consume is larger than the configured Consumer.Fetch.Max

                                      View Source
                                      var ErrNoTopicsToUpdateMetadata = errors.New("kafka: no specific topics to update metadata")

                                        ErrNoTopicsToUpdateMetadata is returned when Meta.Full is set to false but no specific topics were found to update the metadata.

                                        View Source
                                        var ErrNotConnected = errors.New("kafka: broker not connected")

                                          ErrNotConnected is the error returned when trying to send or call Close() on a Broker that is not connected.

                                          View Source
                                          var ErrOutOfBrokers = errors.New("kafka: client has run out of available brokers to talk to (Is your cluster reachable?)")

                                            ErrOutOfBrokers is the error returned when the client has run out of brokers to talk to because all of them errored or otherwise failed to respond.

                                            View Source
                                            var ErrShuttingDown = errors.New("kafka: message received by producer in process of shutting down")

                                              ErrShuttingDown is returned when a producer receives a message during shutdown.

                                              View Source
                                              var NoNode = &Broker{id: -1, addr: ":-1"}

                                              Functions

                                              This section is empty.

                                              Types

                                              type AbortedTransaction

                                              type AbortedTransaction struct {
                                              	ProducerID  int64
                                              	FirstOffset int64
                                              }

                                              type AccessToken

                                              type AccessToken struct {
                                              	// Token is the access token payload.
                                              	Token string
                                              	// Extensions is a optional map of arbitrary key-value pairs that can be
                                              	// sent with the SASL/OAUTHBEARER initial client response. These values are
                                              	// ignored by the SASL server if they are unexpected. This feature is only
                                              	// supported by Kafka >= 2.1.0.
                                              	Extensions map[string]string
                                              }

                                                AccessToken contains an access token used to authenticate a SASL/OAUTHBEARER client along with associated metadata.

                                                type AccessTokenProvider

                                                type AccessTokenProvider interface {
                                                	// Token returns an access token. The implementation should ensure token
                                                	// reuse so that multiple calls at connect time do not create multiple
                                                	// tokens. The implementation should also periodically refresh the token in
                                                	// order to guarantee that each call returns an unexpired token.  This
                                                	// method should not block indefinitely--a timeout error should be returned
                                                	// after a short period of inactivity so that the broker connection logic
                                                	// can log debugging information and retry.
                                                	Token() (*AccessToken, error)
                                                }

                                                  AccessTokenProvider is the interface that encapsulates how implementors can generate access tokens for Kafka broker authentication.

                                                  type Acl

                                                  type Acl struct {
                                                  	Principal      string
                                                  	Host           string
                                                  	Operation      AclOperation
                                                  	PermissionType AclPermissionType
                                                  }

                                                    Acl holds information about acl type

                                                    type AclCreation

                                                    type AclCreation struct {
                                                    	Resource
                                                    	Acl
                                                    }

                                                      AclCreation is a wrapper around Resource and Acl type

                                                      type AclCreationResponse

                                                      type AclCreationResponse struct {
                                                      	Err    KError
                                                      	ErrMsg *string
                                                      }

                                                        AclCreationResponse is an acl creation response type

                                                        type AclFilter

                                                        type AclFilter struct {
                                                        	Version                   int
                                                        	ResourceType              AclResourceType
                                                        	ResourceName              *string
                                                        	ResourcePatternTypeFilter AclResourcePatternType
                                                        	Principal                 *string
                                                        	Host                      *string
                                                        	Operation                 AclOperation
                                                        	PermissionType            AclPermissionType
                                                        }

                                                        type AclOperation

                                                        type AclOperation int
                                                        const (
                                                        	AclOperationUnknown AclOperation = iota
                                                        	AclOperationAny
                                                        	AclOperationAll
                                                        	AclOperationRead
                                                        	AclOperationWrite
                                                        	AclOperationCreate
                                                        	AclOperationDelete
                                                        	AclOperationAlter
                                                        	AclOperationDescribe
                                                        	AclOperationClusterAction
                                                        	AclOperationDescribeConfigs
                                                        	AclOperationAlterConfigs
                                                        	AclOperationIdempotentWrite
                                                        )

                                                          ref: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/acl/AclOperation.java

                                                          type AclPermissionType

                                                          type AclPermissionType int
                                                          const (
                                                          	AclPermissionUnknown AclPermissionType = iota
                                                          	AclPermissionAny
                                                          	AclPermissionDeny
                                                          	AclPermissionAllow
                                                          )

                                                            ref: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/acl/AclPermissionType.java

                                                            type AclResourcePatternType

                                                            type AclResourcePatternType int
                                                            const (
                                                            	AclPatternUnknown AclResourcePatternType = iota
                                                            	AclPatternAny
                                                            	AclPatternMatch
                                                            	AclPatternLiteral
                                                            	AclPatternPrefixed
                                                            )

                                                              ref: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/resource/PatternType.java

                                                              type AclResourceType

                                                              type AclResourceType int
                                                              const (
                                                              	AclResourceUnknown AclResourceType = iota
                                                              	AclResourceAny
                                                              	AclResourceTopic
                                                              	AclResourceGroup
                                                              	AclResourceCluster
                                                              	AclResourceTransactionalID
                                                              )

                                                                ref: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/resource/ResourceType.java

                                                                type AddOffsetsToTxnRequest

                                                                type AddOffsetsToTxnRequest struct {
                                                                	TransactionalID string
                                                                	ProducerID      int64
                                                                	ProducerEpoch   int16
                                                                	GroupID         string
                                                                }

                                                                  AddOffsetsToTxnRequest adds offsets to a transaction request

                                                                  type AddOffsetsToTxnResponse

                                                                  type AddOffsetsToTxnResponse struct {
                                                                  	ThrottleTime time.Duration
                                                                  	Err          KError
                                                                  }

                                                                    AddOffsetsToTxnResponse is a response type for adding offsets to txns

                                                                    type AddPartitionsToTxnRequest

                                                                    type AddPartitionsToTxnRequest struct {
                                                                    	TransactionalID string
                                                                    	ProducerID      int64
                                                                    	ProducerEpoch   int16
                                                                    	TopicPartitions map[string][]int32
                                                                    }

                                                                      AddPartitionsToTxnRequest is a add paartition request

                                                                      type AddPartitionsToTxnResponse

                                                                      type AddPartitionsToTxnResponse struct {
                                                                      	ThrottleTime time.Duration
                                                                      	Errors       map[string][]*PartitionError
                                                                      }

                                                                        AddPartitionsToTxnResponse is a partition errors to transaction type

                                                                        type AlterConfigsRequest

                                                                        type AlterConfigsRequest struct {
                                                                        	Resources    []*AlterConfigsResource
                                                                        	ValidateOnly bool
                                                                        }

                                                                          AlterConfigsRequest is an alter config request type

                                                                          type AlterConfigsResource

                                                                          type AlterConfigsResource struct {
                                                                          	Type          ConfigResourceType
                                                                          	Name          string
                                                                          	ConfigEntries map[string]*string
                                                                          }

                                                                            AlterConfigsResource is an alter config resource type

                                                                            type AlterConfigsResourceResponse

                                                                            type AlterConfigsResourceResponse struct {
                                                                            	ErrorCode int16
                                                                            	ErrorMsg  string
                                                                            	Type      ConfigResourceType
                                                                            	Name      string
                                                                            }

                                                                              AlterConfigsResourceResponse is a response type for alter config resource

                                                                              type AlterConfigsResponse

                                                                              type AlterConfigsResponse struct {
                                                                              	ThrottleTime time.Duration
                                                                              	Resources    []*AlterConfigsResourceResponse
                                                                              }

                                                                                AlterConfigsResponse is a response type for alter config

                                                                                type AlterPartitionReassignmentsRequest

                                                                                type AlterPartitionReassignmentsRequest struct {
                                                                                	TimeoutMs int32
                                                                                
                                                                                	Version int16
                                                                                	// contains filtered or unexported fields
                                                                                }

                                                                                func (*AlterPartitionReassignmentsRequest) AddBlock

                                                                                func (r *AlterPartitionReassignmentsRequest) AddBlock(topic string, partitionID int32, replicas []int32)

                                                                                type AlterPartitionReassignmentsResponse

                                                                                type AlterPartitionReassignmentsResponse struct {
                                                                                	Version        int16
                                                                                	ThrottleTimeMs int32
                                                                                	ErrorCode      KError
                                                                                	ErrorMessage   *string
                                                                                	Errors         map[string]map[int32]*alterPartitionReassignmentsErrorBlock
                                                                                }

                                                                                func (*AlterPartitionReassignmentsResponse) AddError

                                                                                func (r *AlterPartitionReassignmentsResponse) AddError(topic string, partition int32, kerror KError, message *string)

                                                                                type ApiVersionsRequest

                                                                                type ApiVersionsRequest struct {
                                                                                }

                                                                                  ApiVersionsRequest ...

                                                                                  type ApiVersionsResponse

                                                                                  type ApiVersionsResponse struct {
                                                                                  	Err         KError
                                                                                  	ApiVersions []*ApiVersionsResponseBlock
                                                                                  }

                                                                                    ApiVersionsResponse is an api version response type

                                                                                    type ApiVersionsResponseBlock

                                                                                    type ApiVersionsResponseBlock struct {
                                                                                    	ApiKey     int16
                                                                                    	MinVersion int16
                                                                                    	MaxVersion int16
                                                                                    }

                                                                                      ApiVersionsResponseBlock is an api version response block type

                                                                                      type AsyncProducer

                                                                                      type AsyncProducer interface {
                                                                                      
                                                                                      	// AsyncClose triggers a shutdown of the producer. The shutdown has completed
                                                                                      	// when both the Errors and Successes channels have been closed. When calling
                                                                                      	// AsyncClose, you *must* continue to read from those channels in order to
                                                                                      	// drain the results of any messages in flight.
                                                                                      	AsyncClose()
                                                                                      
                                                                                      	// Close shuts down the producer and waits for any buffered messages to be
                                                                                      	// flushed. You must call this function before a producer object passes out of
                                                                                      	// scope, as it may otherwise leak memory. You must call this before calling
                                                                                      	// Close on the underlying client.
                                                                                      	Close() error
                                                                                      
                                                                                      	// Input is the input channel for the user to write messages to that they
                                                                                      	// wish to send.
                                                                                      	Input() chan<- *ProducerMessage
                                                                                      
                                                                                      	// Successes is the success output channel back to the user when Return.Successes is
                                                                                      	// enabled. If Return.Successes is true, you MUST read from this channel or the
                                                                                      	// Producer will deadlock. It is suggested that you send and read messages
                                                                                      	// together in a single select statement.
                                                                                      	Successes() <-chan *ProducerMessage
                                                                                      
                                                                                      	// Errors is the error output channel back to the user. You MUST read from this
                                                                                      	// channel or the Producer will deadlock when the channel is full. Alternatively,
                                                                                      	// you can set Producer.Return.Errors in your config to false, which prevents
                                                                                      	// errors to be returned.
                                                                                      	Errors() <-chan *ProducerError
                                                                                      }

                                                                                        AsyncProducer publishes Kafka messages using a non-blocking API. It routes messages to the correct broker for the provided topic-partition, refreshing metadata as appropriate, and parses responses for errors. You must read from the Errors() channel or the producer will deadlock. You must call Close() or AsyncClose() on a producer to avoid leaks: it will not be garbage-collected automatically when it passes out of scope.

                                                                                        Example (Goroutines)

                                                                                          This example shows how to use the producer with separate goroutines reading from the Successes and Errors channels. Note that in order for the Successes channel to be populated, you have to set config.Producer.Return.Successes to true.

                                                                                          Output:
                                                                                          
                                                                                          
                                                                                          Example (Select)

                                                                                            This example shows how to use the producer while simultaneously reading the Errors channel to know about any failures.

                                                                                            Output:
                                                                                            
                                                                                            

                                                                                            func NewAsyncProducer

                                                                                            func NewAsyncProducer(addrs []string, conf *Config) (AsyncProducer, error)

                                                                                              NewAsyncProducer creates a new AsyncProducer using the given broker addresses and configuration.

                                                                                              func NewAsyncProducerFromClient

                                                                                              func NewAsyncProducerFromClient(client Client) (AsyncProducer, error)

                                                                                                NewAsyncProducerFromClient creates a new Producer using the given client. It is still necessary to call Close() on the underlying client when shutting down this producer.

                                                                                                type BalanceStrategy

                                                                                                type BalanceStrategy interface {
                                                                                                	// Name uniquely identifies the strategy.
                                                                                                	Name() string
                                                                                                
                                                                                                	// Plan accepts a map of `memberID -> metadata` and a map of `topic -> partitions`
                                                                                                	// and returns a distribution plan.
                                                                                                	Plan(members map[string]ConsumerGroupMemberMetadata, topics map[string][]int32) (BalanceStrategyPlan, error)
                                                                                                
                                                                                                	// AssignmentData returns the serialized assignment data for the specified
                                                                                                	// memberID
                                                                                                	AssignmentData(memberID string, topics map[string][]int32, generationID int32) ([]byte, error)
                                                                                                }

                                                                                                  BalanceStrategy is used to balance topics and partitions across members of a consumer group

                                                                                                  type BalanceStrategyPlan

                                                                                                  type BalanceStrategyPlan map[string]map[string][]int32

                                                                                                    BalanceStrategyPlan is the results of any BalanceStrategy.Plan attempt. It contains an allocation of topic/partitions by memberID in the form of a `memberID -> topic -> partitions` map.

                                                                                                    func (BalanceStrategyPlan) Add

                                                                                                    func (p BalanceStrategyPlan) Add(memberID, topic string, partitions ...int32)

                                                                                                      Add assigns a topic with a number partitions to a member.

                                                                                                      type Broker

                                                                                                      type Broker struct {
                                                                                                      	// contains filtered or unexported fields
                                                                                                      }

                                                                                                        Broker represents a single Kafka broker connection. All operations on this object are entirely concurrency-safe.

                                                                                                        Example
                                                                                                        Output:
                                                                                                        
                                                                                                        

                                                                                                        func NewBroker

                                                                                                        func NewBroker(addr string) *Broker

                                                                                                          NewBroker creates and returns a Broker targeting the given host:port address. This does not attempt to actually connect, you have to call Open() for that.

                                                                                                          func (*Broker) AddOffsetsToTxn

                                                                                                          func (b *Broker) AddOffsetsToTxn(request *AddOffsetsToTxnRequest) (*AddOffsetsToTxnResponse, error)

                                                                                                            AddOffsetsToTxn sends a request to add offsets to txn and returns a response or error

                                                                                                            func (*Broker) AddPartitionsToTxn

                                                                                                            func (b *Broker) AddPartitionsToTxn(request *AddPartitionsToTxnRequest) (*AddPartitionsToTxnResponse, error)

                                                                                                              AddPartitionsToTxn send a request to add partition to txn and returns a response or error

                                                                                                              func (*Broker) Addr

                                                                                                              func (b *Broker) Addr() string

                                                                                                                Addr returns the broker address as either retrieved from Kafka's metadata or passed to NewBroker.

                                                                                                                func (*Broker) AlterConfigs

                                                                                                                func (b *Broker) AlterConfigs(request *AlterConfigsRequest) (*AlterConfigsResponse, error)

                                                                                                                  AlterConfigs sends a request to alter config and return a response or error

                                                                                                                  func (*Broker) AlterPartitionReassignments

                                                                                                                  func (b *Broker) AlterPartitionReassignments(request *AlterPartitionReassignmentsRequest) (*AlterPartitionReassignmentsResponse, error)

                                                                                                                    AlterPartitionReassignments sends a alter partition reassignments request and returns alter partition reassignments response

                                                                                                                    func (*Broker) ApiVersions

                                                                                                                    func (b *Broker) ApiVersions(request *ApiVersionsRequest) (*ApiVersionsResponse, error)

                                                                                                                      ApiVersions return api version response or error

                                                                                                                      func (*Broker) Close

                                                                                                                      func (b *Broker) Close() error

                                                                                                                        Close closes the broker resources

                                                                                                                        func (*Broker) CommitOffset

                                                                                                                        func (b *Broker) CommitOffset(request *OffsetCommitRequest) (*OffsetCommitResponse, error)

                                                                                                                          CommitOffset return an Offset commit response or error

                                                                                                                          func (*Broker) Connected

                                                                                                                          func (b *Broker) Connected() (bool, error)

                                                                                                                            Connected returns true if the broker is connected and false otherwise. If the broker is not connected but it had tried to connect, the error from that connection attempt is also returned.

                                                                                                                            func (*Broker) CreateAcls

                                                                                                                            func (b *Broker) CreateAcls(request *CreateAclsRequest) (*CreateAclsResponse, error)

                                                                                                                              CreateAcls sends a create acl request and returns a response or error

                                                                                                                              func (*Broker) CreatePartitions

                                                                                                                              func (b *Broker) CreatePartitions(request *CreatePartitionsRequest) (*CreatePartitionsResponse, error)

                                                                                                                                CreatePartitions sends a create partition request and returns create partitions response or error

                                                                                                                                func (*Broker) CreateTopics

                                                                                                                                func (b *Broker) CreateTopics(request *CreateTopicsRequest) (*CreateTopicsResponse, error)

                                                                                                                                  CreateTopics send a create topic request and returns create topic response

                                                                                                                                  func (*Broker) DeleteAcls

                                                                                                                                  func (b *Broker) DeleteAcls(request *DeleteAclsRequest) (*DeleteAclsResponse, error)

                                                                                                                                    DeleteAcls sends a delete acl request and returns a response or error

                                                                                                                                    func (*Broker) DeleteGroups

                                                                                                                                    func (b *Broker) DeleteGroups(request *DeleteGroupsRequest) (*DeleteGroupsResponse, error)

                                                                                                                                      DeleteGroups sends a request to delete groups and returns a response or error

                                                                                                                                      func (*Broker) DeleteRecords

                                                                                                                                      func (b *Broker) DeleteRecords(request *DeleteRecordsRequest) (*DeleteRecordsResponse, error)

                                                                                                                                        DeleteRecords send a request to delete records and return delete record response or error

                                                                                                                                        func (*Broker) DeleteTopics

                                                                                                                                        func (b *Broker) DeleteTopics(request *DeleteTopicsRequest) (*DeleteTopicsResponse, error)

                                                                                                                                          DeleteTopics sends a delete topic request and returns delete topic response

                                                                                                                                          func (*Broker) DescribeAcls

                                                                                                                                          func (b *Broker) DescribeAcls(request *DescribeAclsRequest) (*DescribeAclsResponse, error)

                                                                                                                                            DescribeAcls sends a describe acl request and returns a response or error

                                                                                                                                            func (*Broker) DescribeConfigs

                                                                                                                                            func (b *Broker) DescribeConfigs(request *DescribeConfigsRequest) (*DescribeConfigsResponse, error)

                                                                                                                                              DescribeConfigs sends a request to describe config and returns a response or error

                                                                                                                                              func (*Broker) DescribeGroups

                                                                                                                                              func (b *Broker) DescribeGroups(request *DescribeGroupsRequest) (*DescribeGroupsResponse, error)

                                                                                                                                                DescribeGroups return describe group response or error

                                                                                                                                                func (*Broker) DescribeLogDirs

                                                                                                                                                func (b *Broker) DescribeLogDirs(request *DescribeLogDirsRequest) (*DescribeLogDirsResponse, error)

                                                                                                                                                  DescribeLogDirs sends a request to get the broker's log dir paths and sizes

                                                                                                                                                  func (*Broker) EndTxn

                                                                                                                                                  func (b *Broker) EndTxn(request *EndTxnRequest) (*EndTxnResponse, error)

                                                                                                                                                    EndTxn sends a request to end txn and returns a response or error

                                                                                                                                                    func (*Broker) Fetch

                                                                                                                                                    func (b *Broker) Fetch(request *FetchRequest) (*FetchResponse, error)

                                                                                                                                                      Fetch returns a FetchResponse or error

                                                                                                                                                      func (*Broker) FetchOffset

                                                                                                                                                      func (b *Broker) FetchOffset(request *OffsetFetchRequest) (*OffsetFetchResponse, error)

                                                                                                                                                        FetchOffset returns an offset fetch response or error

                                                                                                                                                        func (*Broker) FindCoordinator

                                                                                                                                                        func (b *Broker) FindCoordinator(request *FindCoordinatorRequest) (*FindCoordinatorResponse, error)

                                                                                                                                                          FindCoordinator sends a find coordinate request and returns a response or error

                                                                                                                                                          func (*Broker) GetAvailableOffsets

                                                                                                                                                          func (b *Broker) GetAvailableOffsets(request *OffsetRequest) (*OffsetResponse, error)

                                                                                                                                                            GetAvailableOffsets return an offset response or error

                                                                                                                                                            func (*Broker) GetConsumerMetadata

                                                                                                                                                            func (b *Broker) GetConsumerMetadata(request *ConsumerMetadataRequest) (*ConsumerMetadataResponse, error)

                                                                                                                                                              GetConsumerMetadata send a consumer metadata request and returns a consumer metadata response or error

                                                                                                                                                              func (*Broker) GetMetadata

                                                                                                                                                              func (b *Broker) GetMetadata(request *MetadataRequest) (*MetadataResponse, error)

                                                                                                                                                                GetMetadata send a metadata request and returns a metadata response or error

                                                                                                                                                                func (*Broker) Heartbeat

                                                                                                                                                                func (b *Broker) Heartbeat(request *HeartbeatRequest) (*HeartbeatResponse, error)

                                                                                                                                                                  Heartbeat returns a heartbeat response or error

                                                                                                                                                                  func (*Broker) ID

                                                                                                                                                                  func (b *Broker) ID() int32

                                                                                                                                                                    ID returns the broker ID retrieved from Kafka's metadata, or -1 if that is not known.

                                                                                                                                                                    func (*Broker) InitProducerID

                                                                                                                                                                    func (b *Broker) InitProducerID(request *InitProducerIDRequest) (*InitProducerIDResponse, error)

                                                                                                                                                                      InitProducerID sends an init producer request and returns a response or error

                                                                                                                                                                      func (*Broker) JoinGroup

                                                                                                                                                                      func (b *Broker) JoinGroup(request *JoinGroupRequest) (*JoinGroupResponse, error)

                                                                                                                                                                        JoinGroup returns a join group response or error

                                                                                                                                                                        func (*Broker) LeaveGroup

                                                                                                                                                                        func (b *Broker) LeaveGroup(request *LeaveGroupRequest) (*LeaveGroupResponse, error)

                                                                                                                                                                          LeaveGroup return a leave group response or error

                                                                                                                                                                          func (*Broker) ListGroups

                                                                                                                                                                          func (b *Broker) ListGroups(request *ListGroupsRequest) (*ListGroupsResponse, error)

                                                                                                                                                                            ListGroups return a list group response or error

                                                                                                                                                                            func (*Broker) ListPartitionReassignments

                                                                                                                                                                            func (b *Broker) ListPartitionReassignments(request *ListPartitionReassignmentsRequest) (*ListPartitionReassignmentsResponse, error)

                                                                                                                                                                              ListPartitionReassignments sends a list partition reassignments request and returns list partition reassignments response

                                                                                                                                                                              func (*Broker) Open

                                                                                                                                                                              func (b *Broker) Open(conf *Config) error

                                                                                                                                                                                Open tries to connect to the Broker if it is not already connected or connecting, but does not block waiting for the connection to complete. This means that any subsequent operations on the broker will block waiting for the connection to succeed or fail. To get the effect of a fully synchronous Open call, follow it by a call to Connected(). The only errors Open will return directly are ConfigurationError or AlreadyConnected. If conf is nil, the result of NewConfig() is used.

                                                                                                                                                                                func (*Broker) Produce

                                                                                                                                                                                func (b *Broker) Produce(request *ProduceRequest) (*ProduceResponse, error)

                                                                                                                                                                                  Produce returns a produce response or error

                                                                                                                                                                                  func (*Broker) Rack

                                                                                                                                                                                  func (b *Broker) Rack() string

                                                                                                                                                                                    Rack returns the broker's rack as retrieved from Kafka's metadata or the empty string if it is not known. The returned value corresponds to the broker's broker.rack configuration setting. Requires protocol version to be at least v0.10.0.0.

                                                                                                                                                                                    func (*Broker) SyncGroup

                                                                                                                                                                                    func (b *Broker) SyncGroup(request *SyncGroupRequest) (*SyncGroupResponse, error)

                                                                                                                                                                                      SyncGroup returns a sync group response or error

                                                                                                                                                                                      func (*Broker) TxnOffsetCommit

                                                                                                                                                                                      func (b *Broker) TxnOffsetCommit(request *TxnOffsetCommitRequest) (*TxnOffsetCommitResponse, error)

                                                                                                                                                                                        TxnOffsetCommit sends a request to commit transaction offsets and returns a response or error

                                                                                                                                                                                        type ByteEncoder

                                                                                                                                                                                        type ByteEncoder []byte

                                                                                                                                                                                          ByteEncoder implements the Encoder interface for Go byte slices so that they can be used as the Key or Value in a ProducerMessage.

                                                                                                                                                                                          func (ByteEncoder) Encode

                                                                                                                                                                                          func (b ByteEncoder) Encode() ([]byte, error)

                                                                                                                                                                                          func (ByteEncoder) Length

                                                                                                                                                                                          func (b ByteEncoder) Length() int

                                                                                                                                                                                          type Client

                                                                                                                                                                                          type Client interface {
                                                                                                                                                                                          	// Config returns the Config struct of the client. This struct should not be
                                                                                                                                                                                          	// altered after it has been created.
                                                                                                                                                                                          	Config() *Config
                                                                                                                                                                                          
                                                                                                                                                                                          	// Controller returns the cluster controller broker. It will return a
                                                                                                                                                                                          	// locally cached value if it's available. You can call RefreshController
                                                                                                                                                                                          	// to update the cached value. Requires Kafka 0.10 or higher.
                                                                                                                                                                                          	Controller() (*Broker, error)
                                                                                                                                                                                          
                                                                                                                                                                                          	// RefreshController retrieves the cluster controller from fresh metadata
                                                                                                                                                                                          	// and stores it in the local cache. Requires Kafka 0.10 or higher.
                                                                                                                                                                                          	RefreshController() (*Broker, error)
                                                                                                                                                                                          
                                                                                                                                                                                          	// Brokers returns the current set of active brokers as retrieved from cluster metadata.
                                                                                                                                                                                          	Brokers() []*Broker
                                                                                                                                                                                          
                                                                                                                                                                                          	// Broker returns the active Broker if available for the broker ID.
                                                                                                                                                                                          	Broker(brokerID int32) (*Broker, error)
                                                                                                                                                                                          
                                                                                                                                                                                          	// Topics returns the set of available topics as retrieved from cluster metadata.
                                                                                                                                                                                          	Topics() ([]string, error)
                                                                                                                                                                                          
                                                                                                                                                                                          	// Partitions returns the sorted list of all partition IDs for the given topic.
                                                                                                                                                                                          	Partitions(topic string) ([]int32, error)
                                                                                                                                                                                          
                                                                                                                                                                                          	// WritablePartitions returns the sorted list of all writable partition IDs for
                                                                                                                                                                                          	// the given topic, where "writable" means "having a valid leader accepting
                                                                                                                                                                                          	// writes".
                                                                                                                                                                                          	WritablePartitions(topic string) ([]int32, error)
                                                                                                                                                                                          
                                                                                                                                                                                          	// Leader returns the broker object that is the leader of the current
                                                                                                                                                                                          	// topic/partition, as determined by querying the cluster metadata.
                                                                                                                                                                                          	Leader(topic string, partitionID int32) (*Broker, error)
                                                                                                                                                                                          
                                                                                                                                                                                          	// Replicas returns the set of all replica IDs for the given partition.
                                                                                                                                                                                          	Replicas(topic string, partitionID int32) ([]int32, error)
                                                                                                                                                                                          
                                                                                                                                                                                          	// InSyncReplicas returns the set of all in-sync replica IDs for the given
                                                                                                                                                                                          	// partition. In-sync replicas are replicas which are fully caught up with
                                                                                                                                                                                          	// the partition leader.
                                                                                                                                                                                          	InSyncReplicas(topic string, partitionID int32) ([]int32, error)
                                                                                                                                                                                          
                                                                                                                                                                                          	// OfflineReplicas returns the set of all offline replica IDs for the given
                                                                                                                                                                                          	// partition. Offline replicas are replicas which are offline
                                                                                                                                                                                          	OfflineReplicas(topic string, partitionID int32) ([]int32, error)
                                                                                                                                                                                          
                                                                                                                                                                                          	// RefreshBrokers takes a list of addresses to be used as seed brokers.
                                                                                                                                                                                          	// Existing broker connections are closed and the updated list of seed brokers
                                                                                                                                                                                          	// will be used for the next metadata fetch.
                                                                                                                                                                                          	RefreshBrokers(addrs []string) error
                                                                                                                                                                                          
                                                                                                                                                                                          	// RefreshMetadata takes a list of topics and queries the cluster to refresh the
                                                                                                                                                                                          	// available metadata for those topics. If no topics are provided, it will refresh
                                                                                                                                                                                          	// metadata for all topics.
                                                                                                                                                                                          	RefreshMetadata(topics ...string) error
                                                                                                                                                                                          
                                                                                                                                                                                          	// GetOffset queries the cluster to get the most recent available offset at the
                                                                                                                                                                                          	// given time (in milliseconds) on the topic/partition combination.
                                                                                                                                                                                          	// Time should be OffsetOldest for the earliest available offset,
                                                                                                                                                                                          	// OffsetNewest for the offset of the message that will be produced next, or a time.
                                                                                                                                                                                          	GetOffset(topic string, partitionID int32, time int64) (int64, error)
                                                                                                                                                                                          
                                                                                                                                                                                          	// Coordinator returns the coordinating broker for a consumer group. It will
                                                                                                                                                                                          	// return a locally cached value if it's available. You can call
                                                                                                                                                                                          	// RefreshCoordinator to update the cached value. This function only works on
                                                                                                                                                                                          	// Kafka 0.8.2 and higher.
                                                                                                                                                                                          	Coordinator(consumerGroup string) (*Broker, error)
                                                                                                                                                                                          
                                                                                                                                                                                          	// RefreshCoordinator retrieves the coordinator for a consumer group and stores it
                                                                                                                                                                                          	// in local cache. This function only works on Kafka 0.8.2 and higher.
                                                                                                                                                                                          	RefreshCoordinator(consumerGroup string) error
                                                                                                                                                                                          
                                                                                                                                                                                          	// InitProducerID retrieves information required for Idempotent Producer
                                                                                                                                                                                          	InitProducerID() (*InitProducerIDResponse, error)
                                                                                                                                                                                          
                                                                                                                                                                                          	// Close shuts down all broker connections managed by this client. It is required
                                                                                                                                                                                          	// to call this function before a client object passes out of scope, as it will
                                                                                                                                                                                          	// otherwise leak memory. You must close any Producers or Consumers using a client
                                                                                                                                                                                          	// before you close the client.
                                                                                                                                                                                          	Close() error
                                                                                                                                                                                          
                                                                                                                                                                                          	// Closed returns true if the client has already had Close called on it
                                                                                                                                                                                          	Closed() bool
                                                                                                                                                                                          }

                                                                                                                                                                                            Client is a generic Kafka client. It manages connections to one or more Kafka brokers. You MUST call Close() on a client to avoid leaks, it will not be garbage-collected automatically when it passes out of scope. It is safe to share a client amongst many users, however Kafka will process requests from a single client strictly in serial, so it is generally more efficient to use the default one client per producer/consumer.

                                                                                                                                                                                            func NewClient

                                                                                                                                                                                            func NewClient(addrs []string, conf *Config) (Client, error)

                                                                                                                                                                                              NewClient creates a new Client. It connects to one of the given broker addresses and uses that broker to automatically fetch metadata on the rest of the kafka cluster. If metadata cannot be retrieved from any of the given broker addresses, the client is not created.

                                                                                                                                                                                              type ClusterAdmin

                                                                                                                                                                                              type ClusterAdmin interface {
                                                                                                                                                                                              	// Creates a new topic. This operation is supported by brokers with version 0.10.1.0 or higher.
                                                                                                                                                                                              	// It may take several seconds after CreateTopic returns success for all the brokers
                                                                                                                                                                                              	// to become aware that the topic has been created. During this time, listTopics
                                                                                                                                                                                              	// may not return information about the new topic.The validateOnly option is supported from version 0.10.2.0.
                                                                                                                                                                                              	CreateTopic(topic string, detail *TopicDetail, validateOnly bool) error
                                                                                                                                                                                              
                                                                                                                                                                                              	// List the topics available in the cluster with the default options.
                                                                                                                                                                                              	ListTopics() (map[string]TopicDetail, error)
                                                                                                                                                                                              
                                                                                                                                                                                              	// Describe some topics in the cluster.
                                                                                                                                                                                              	DescribeTopics(topics []string) (metadata []*TopicMetadata, err error)
                                                                                                                                                                                              
                                                                                                                                                                                              	// Delete a topic. It may take several seconds after the DeleteTopic to returns success
                                                                                                                                                                                              	// and for all the brokers to become aware that the topics are gone.
                                                                                                                                                                                              	// During this time, listTopics  may continue to return information about the deleted topic.
                                                                                                                                                                                              	// If delete.topic.enable is false on the brokers, deleteTopic will mark
                                                                                                                                                                                              	// the topic for deletion, but not actually delete them.
                                                                                                                                                                                              	// This operation is supported by brokers with version 0.10.1.0 or higher.
                                                                                                                                                                                              	DeleteTopic(topic string) error
                                                                                                                                                                                              
                                                                                                                                                                                              	// Increase the number of partitions of the topics  according to the corresponding values.
                                                                                                                                                                                              	// If partitions are increased for a topic that has a key, the partition logic or ordering of
                                                                                                                                                                                              	// the messages will be affected. It may take several seconds after this method returns
                                                                                                                                                                                              	// success for all the brokers to become aware that the partitions have been created.
                                                                                                                                                                                              	// During this time, ClusterAdmin#describeTopics may not return information about the
                                                                                                                                                                                              	// new partitions. This operation is supported by brokers with version 1.0.0 or higher.
                                                                                                                                                                                              	CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error
                                                                                                                                                                                              
                                                                                                                                                                                              	// Alter the replica assignment for partitions.
                                                                                                                                                                                              	// This operation is supported by brokers with version 2.4.0.0 or higher.
                                                                                                                                                                                              	AlterPartitionReassignments(topic string, assignment [][]int32) error
                                                                                                                                                                                              
                                                                                                                                                                                              	// Provides info on ongoing partitions replica reassignments.
                                                                                                                                                                                              	// This operation is supported by brokers with version 2.4.0.0 or higher.
                                                                                                                                                                                              	ListPartitionReassignments(topics string, partitions []int32) (topicStatus map[string]map[int32]*PartitionReplicaReassignmentsStatus, err error)
                                                                                                                                                                                              
                                                                                                                                                                                              	// Delete records whose offset is smaller than the given offset of the corresponding partition.
                                                                                                                                                                                              	// This operation is supported by brokers with version 0.11.0.0 or higher.
                                                                                                                                                                                              	DeleteRecords(topic string, partitionOffsets map[int32]int64) error
                                                                                                                                                                                              
                                                                                                                                                                                              	// Get the configuration for the specified resources.
                                                                                                                                                                                              	// The returned configuration includes default values and the Default is true
                                                                                                                                                                                              	// can be used to distinguish them from user supplied values.
                                                                                                                                                                                              	// Config entries where ReadOnly is true cannot be updated.
                                                                                                                                                                                              	// The value of config entries where Sensitive is true is always nil so
                                                                                                                                                                                              	// sensitive information is not disclosed.
                                                                                                                                                                                              	// This operation is supported by brokers with version 0.11.0.0 or higher.
                                                                                                                                                                                              	DescribeConfig(resource ConfigResource) ([]ConfigEntry, error)
                                                                                                                                                                                              
                                                                                                                                                                                              	// Update the configuration for the specified resources with the default options.
                                                                                                                                                                                              	// This operation is supported by brokers with version 0.11.0.0 or higher.
                                                                                                                                                                                              	// The resources with their configs (topic is the only resource type with configs
                                                                                                                                                                                              	// that can be updated currently Updates are not transactional so they may succeed
                                                                                                                                                                                              	// for some resources while fail for others. The configs for a particular resource are updated automatically.
                                                                                                                                                                                              	AlterConfig(resourceType ConfigResourceType, name string, entries map[string]*string, validateOnly bool) error
                                                                                                                                                                                              
                                                                                                                                                                                              	// Creates access control lists (ACLs) which are bound to specific resources.
                                                                                                                                                                                              	// This operation is not transactional so it may succeed for some ACLs while fail for others.
                                                                                                                                                                                              	// If you attempt to add an ACL that duplicates an existing ACL, no error will be raised, but
                                                                                                                                                                                              	// no changes will be made. This operation is supported by brokers with version 0.11.0.0 or higher.
                                                                                                                                                                                              	CreateACL(resource Resource, acl Acl) error
                                                                                                                                                                                              
                                                                                                                                                                                              	// Lists access control lists (ACLs) according to the supplied filter.
                                                                                                                                                                                              	// it may take some time for changes made by createAcls or deleteAcls to be reflected in the output of ListAcls
                                                                                                                                                                                              	// This operation is supported by brokers with version 0.11.0.0 or higher.
                                                                                                                                                                                              	ListAcls(filter AclFilter) ([]ResourceAcls, error)
                                                                                                                                                                                              
                                                                                                                                                                                              	// Deletes access control lists (ACLs) according to the supplied filters.
                                                                                                                                                                                              	// This operation is not transactional so it may succeed for some ACLs while fail for others.
                                                                                                                                                                                              	// This operation is supported by brokers with version 0.11.0.0 or higher.
                                                                                                                                                                                              	DeleteACL(filter AclFilter, validateOnly bool) ([]MatchingAcl, error)
                                                                                                                                                                                              
                                                                                                                                                                                              	// List the consumer groups available in the cluster.
                                                                                                                                                                                              	ListConsumerGroups() (map[string]string, error)
                                                                                                                                                                                              
                                                                                                                                                                                              	// Describe the given consumer groups.
                                                                                                                                                                                              	DescribeConsumerGroups(groups []string) ([]*GroupDescription, error)
                                                                                                                                                                                              
                                                                                                                                                                                              	// List the consumer group offsets available in the cluster.
                                                                                                                                                                                              	ListConsumerGroupOffsets(group string, topicPartitions map[string][]int32) (*OffsetFetchResponse, error)
                                                                                                                                                                                              
                                                                                                                                                                                              	// Delete a consumer group.
                                                                                                                                                                                              	DeleteConsumerGroup(group string) error
                                                                                                                                                                                              
                                                                                                                                                                                              	// Get information about the nodes in the cluster
                                                                                                                                                                                              	DescribeCluster() (brokers []*Broker, controllerID int32, err error)
                                                                                                                                                                                              
                                                                                                                                                                                              	// Get information about all log directories on the given set of brokers
                                                                                                                                                                                              	DescribeLogDirs(brokers []int32) (map[int32][]DescribeLogDirsResponseDirMetadata, error)
                                                                                                                                                                                              
                                                                                                                                                                                              	// Close shuts down the admin and closes underlying client.
                                                                                                                                                                                              	Close() error
                                                                                                                                                                                              }

                                                                                                                                                                                                ClusterAdmin is the administrative client for Kafka, which supports managing and inspecting topics, brokers, configurations and ACLs. The minimum broker version required is 0.10.0.0. Methods with stricter requirements will specify the minimum broker version required. You MUST call Close() on a client to avoid leaks

                                                                                                                                                                                                func NewClusterAdmin

                                                                                                                                                                                                func NewClusterAdmin(addrs []string, conf *Config) (ClusterAdmin, error)

                                                                                                                                                                                                  NewClusterAdmin creates a new ClusterAdmin using the given broker addresses and configuration.

                                                                                                                                                                                                  func NewClusterAdminFromClient

                                                                                                                                                                                                  func NewClusterAdminFromClient(client Client) (ClusterAdmin, error)

                                                                                                                                                                                                    NewClusterAdminFromClient creates a new ClusterAdmin using the given client. Note that underlying client will also be closed on admin's Close() call.

                                                                                                                                                                                                    type CompressionCodec

                                                                                                                                                                                                    type CompressionCodec int8

                                                                                                                                                                                                      CompressionCodec represents the various compression codecs recognized by Kafka in messages.

                                                                                                                                                                                                      const (
                                                                                                                                                                                                      	//CompressionNone no compression
                                                                                                                                                                                                      	CompressionNone CompressionCodec = iota
                                                                                                                                                                                                      	//CompressionGZIP compression using GZIP
                                                                                                                                                                                                      	CompressionGZIP
                                                                                                                                                                                                      	//CompressionSnappy compression using snappy
                                                                                                                                                                                                      	CompressionSnappy
                                                                                                                                                                                                      	//CompressionLZ4 compression using LZ4
                                                                                                                                                                                                      	CompressionLZ4
                                                                                                                                                                                                      	//CompressionZSTD compression using ZSTD
                                                                                                                                                                                                      	CompressionZSTD
                                                                                                                                                                                                      
                                                                                                                                                                                                      	// CompressionLevelDefault is the constant to use in CompressionLevel
                                                                                                                                                                                                      	// to have the default compression level for any codec. The value is picked
                                                                                                                                                                                                      	// that we don't use any existing compression levels.
                                                                                                                                                                                                      	CompressionLevelDefault = -1000
                                                                                                                                                                                                      )

                                                                                                                                                                                                      func (CompressionCodec) String

                                                                                                                                                                                                      func (cc CompressionCodec) String() string

                                                                                                                                                                                                      type Config

                                                                                                                                                                                                      type Config struct {
                                                                                                                                                                                                      	// Admin is the namespace for ClusterAdmin properties used by the administrative Kafka client.
                                                                                                                                                                                                      	Admin struct {
                                                                                                                                                                                                      		Retry struct {
                                                                                                                                                                                                      			// The total number of times to retry sending (retriable) admin requests (default 5).
                                                                                                                                                                                                      			// Similar to the `retries` setting of the JVM AdminClientConfig.
                                                                                                                                                                                                      			Max int
                                                                                                                                                                                                      			// Backoff time between retries of a failed request (default 100ms)
                                                                                                                                                                                                      			Backoff time.Duration
                                                                                                                                                                                                      		}
                                                                                                                                                                                                      		// The maximum duration the administrative Kafka client will wait for ClusterAdmin operations,
                                                                                                                                                                                                      		// including topics, brokers, configurations and ACLs (defaults to 3 seconds).
                                                                                                                                                                                                      		Timeout time.Duration
                                                                                                                                                                                                      	}
                                                                                                                                                                                                      
                                                                                                                                                                                                      	// Net is the namespace for network-level properties used by the Broker, and
                                                                                                                                                                                                      	// shared by the Client/Producer/Consumer.
                                                                                                                                                                                                      	Net struct {
                                                                                                                                                                                                      		// How many outstanding requests a connection is allowed to have before
                                                                                                                                                                                                      		// sending on it blocks (default 5).
                                                                                                                                                                                                      		MaxOpenRequests int
                                                                                                                                                                                                      
                                                                                                                                                                                                      		// All three of the below configurations are similar to the
                                                                                                                                                                                                      		// `socket.timeout.ms` setting in JVM kafka. All of them default
                                                                                                                                                                                                      		// to 30 seconds.
                                                                                                                                                                                                      		DialTimeout  time.Duration // How long to wait for the initial connection.
                                                                                                                                                                                                      		ReadTimeout  time.Duration // How long to wait for a response.
                                                                                                                                                                                                      		WriteTimeout time.Duration // How long to wait for a transmit.
                                                                                                                                                                                                      
                                                                                                                                                                                                      		TLS struct {
                                                                                                                                                                                                      			// Whether or not to use TLS when connecting to the broker
                                                                                                                                                                                                      			// (defaults to false).
                                                                                                                                                                                                      			Enable bool
                                                                                                                                                                                                      			// The TLS configuration to use for secure connections if
                                                                                                                                                                                                      			// enabled (defaults to nil).
                                                                                                                                                                                                      			Config *tls.Config
                                                                                                                                                                                                      		}
                                                                                                                                                                                                      
                                                                                                                                                                                                      		// SASL based authentication with broker. While there are multiple SASL authentication methods
                                                                                                                                                                                                      		// the current implementation is limited to plaintext (SASL/PLAIN) authentication
                                                                                                                                                                                                      		SASL struct {
                                                                                                                                                                                                      			// Whether or not to use SASL authentication when connecting to the broker
                                                                                                                                                                                                      			// (defaults to false).
                                                                                                                                                                                                      			Enable bool
                                                                                                                                                                                                      			// SASLMechanism is the name of the enabled SASL mechanism.
                                                                                                                                                                                                      			// Possible values: OAUTHBEARER, PLAIN (defaults to PLAIN).
                                                                                                                                                                                                      			Mechanism SASLMechanism
                                                                                                                                                                                                      			// Version is the SASL Protocol Version to use
                                                                                                                                                                                                      			// Kafka > 1.x should use V1, except on Azure EventHub which use V0
                                                                                                                                                                                                      			Version int16
                                                                                                                                                                                                      			// Whether or not to send the Kafka SASL handshake first if enabled
                                                                                                                                                                                                      			// (defaults to true). You should only set this to false if you're using
                                                                                                                                                                                                      			// a non-Kafka SASL proxy.
                                                                                                                                                                                                      			Handshake bool
                                                                                                                                                                                                      			// AuthIdentity is an (optional) authorization identity (authzid) to
                                                                                                                                                                                                      			// use for SASL/PLAIN authentication (if different from User) when
                                                                                                                                                                                                      			// an authenticated user is permitted to act as the presented
                                                                                                                                                                                                      			// alternative user. See RFC4616 for details.
                                                                                                                                                                                                      			AuthIdentity string
                                                                                                                                                                                                      			// User is the authentication identity (authcid) to present for
                                                                                                                                                                                                      			// SASL/PLAIN or SASL/SCRAM authentication
                                                                                                                                                                                                      			User string
                                                                                                                                                                                                      			// Password for SASL/PLAIN authentication
                                                                                                                                                                                                      			Password string
                                                                                                                                                                                                      			// authz id used for SASL/SCRAM authentication
                                                                                                                                                                                                      			SCRAMAuthzID string
                                                                                                                                                                                                      			// SCRAMClientGeneratorFunc is a generator of a user provided implementation of a SCRAM
                                                                                                                                                                                                      			// client used to perform the SCRAM exchange with the server.
                                                                                                                                                                                                      			SCRAMClientGeneratorFunc func() SCRAMClient
                                                                                                                                                                                                      			// TokenProvider is a user-defined callback for generating
                                                                                                                                                                                                      			// access tokens for SASL/OAUTHBEARER auth. See the
                                                                                                                                                                                                      			// AccessTokenProvider interface docs for proper implementation
                                                                                                                                                                                                      			// guidelines.
                                                                                                                                                                                                      			TokenProvider AccessTokenProvider
                                                                                                                                                                                                      
                                                                                                                                                                                                      			GSSAPI GSSAPIConfig
                                                                                                                                                                                                      		}
                                                                                                                                                                                                      
                                                                                                                                                                                                      		// KeepAlive specifies the keep-alive period for an active network connection (defaults to 0).
                                                                                                                                                                                                      		// If zero or positive, keep-alives are enabled.
                                                                                                                                                                                                      		// If negative, keep-alives are disabled.
                                                                                                                                                                                                      		KeepAlive time.Duration
                                                                                                                                                                                                      
                                                                                                                                                                                                      		// LocalAddr is the local address to use when dialing an
                                                                                                                                                                                                      		// address. The address must be of a compatible type for the
                                                                                                                                                                                                      		// network being dialed.
                                                                                                                                                                                                      		// If nil, a local address is automatically chosen.
                                                                                                                                                                                                      		LocalAddr net.Addr
                                                                                                                                                                                                      
                                                                                                                                                                                                      		Proxy struct {
                                                                                                                                                                                                      			// Whether or not to use proxy when connecting to the broker
                                                                                                                                                                                                      			// (defaults to false).
                                                                                                                                                                                                      			Enable bool
                                                                                                                                                                                                      			// The proxy dialer to use enabled (defaults to nil).
                                                                                                                                                                                                      			Dialer proxy.Dialer
                                                                                                                                                                                                      		}
                                                                                                                                                                                                      	}
                                                                                                                                                                                                      
                                                                                                                                                                                                      	// Metadata is the namespace for metadata management properties used by the
                                                                                                                                                                                                      	// Client, and shared by the Producer/Consumer.
                                                                                                                                                                                                      	Metadata struct {
                                                                                                                                                                                                      		Retry struct {
                                                                                                                                                                                                      			// The total number of times to retry a metadata request when the
                                                                                                                                                                                                      			// cluster is in the middle of a leader election (default 3).
                                                                                                                                                                                                      			Max int
                                                                                                                                                                                                      			// How long to wait for leader election to occur before retrying
                                                                                                                                                                                                      			// (default 250ms). Similar to the JVM's `retry.backoff.ms`.
                                                                                                                                                                                                      			Backoff time.Duration
                                                                                                                                                                                                      			// Called to compute backoff time dynamically. Useful for implementing
                                                                                                                                                                                                      			// more sophisticated backoff strategies. This takes precedence over
                                                                                                                                                                                                      			// `Backoff` if set.
                                                                                                                                                                                                      			BackoffFunc func(retries, maxRetries int) time.Duration
                                                                                                                                                                                                      		}
                                                                                                                                                                                                      		// How frequently to refresh the cluster metadata in the background.
                                                                                                                                                                                                      		// Defaults to 10 minutes. Set to 0 to disable. Similar to
                                                                                                                                                                                                      		// `topic.metadata.refresh.interval.ms` in the JVM version.
                                                                                                                                                                                                      		RefreshFrequency time.Duration
                                                                                                                                                                                                      
                                                                                                                                                                                                      		// Whether to maintain a full set of metadata for all topics, or just
                                                                                                                                                                                                      		// the minimal set that has been necessary so far. The full set is simpler
                                                                                                                                                                                                      		// and usually more convenient, but can take up a substantial amount of
                                                                                                                                                                                                      		// memory if you have many topics and partitions. Defaults to true.
                                                                                                                                                                                                      		Full bool
                                                                                                                                                                                                      
                                                                                                                                                                                                      		// How long to wait for a successful metadata response.
                                                                                                                                                                                                      		// Disabled by default which means a metadata request against an unreachable
                                                                                                                                                                                                      		// cluster (all brokers are unreachable or unresponsive) can take up to
                                                                                                                                                                                                      		// `Net.[Dial|Read]Timeout * BrokerCount * (Metadata.Retry.Max + 1) + Metadata.Retry.Backoff * Metadata.Retry.Max`
                                                                                                                                                                                                      		// to fail.
                                                                                                                                                                                                      		Timeout time.Duration
                                                                                                                                                                                                      	}
                                                                                                                                                                                                      
                                                                                                                                                                                                      	// Producer is the namespace for configuration related to producing messages,
                                                                                                                                                                                                      	// used by the Producer.
                                                                                                                                                                                                      	Producer struct {
                                                                                                                                                                                                      		// The maximum permitted size of a message (defaults to 1000000). Should be
                                                                                                                                                                                                      		// set equal to or smaller than the broker's `message.max.bytes`.
                                                                                                                                                                                                      		MaxMessageBytes int
                                                                                                                                                                                                      		// The level of acknowledgement reliability needed from the broker (defaults
                                                                                                                                                                                                      		// to WaitForLocal). Equivalent to the `request.required.acks` setting of the
                                                                                                                                                                                                      		// JVM producer.
                                                                                                                                                                                                      		RequiredAcks RequiredAcks
                                                                                                                                                                                                      		// The maximum duration the broker will wait the receipt of the number of
                                                                                                                                                                                                      		// RequiredAcks (defaults to 10 seconds). This is only relevant when
                                                                                                                                                                                                      		// RequiredAcks is set to WaitForAll or a number > 1. Only supports
                                                                                                                                                                                                      		// millisecond resolution, nanoseconds will be truncated. Equivalent to
                                                                                                                                                                                                      		// the JVM producer's `request.timeout.ms` setting.
                                                                                                                                                                                                      		Timeout time.Duration
                                                                                                                                                                                                      		// The type of compression to use on messages (defaults to no compression).
                                                                                                                                                                                                      		// Similar to `compression.codec` setting of the JVM producer.
                                                                                                                                                                                                      		Compression CompressionCodec
                                                                                                                                                                                                      		// The level of compression to use on messages. The meaning depends
                                                                                                                                                                                                      		// on the actual compression type used and defaults to default compression
                                                                                                                                                                                                      		// level for the codec.
                                                                                                                                                                                                      		CompressionLevel int
                                                                                                                                                                                                      		// Generates partitioners for choosing the partition to send messages to
                                                                                                                                                                                                      		// (defaults to hashing the message key). Similar to the `partitioner.class`
                                                                                                                                                                                                      		// setting for the JVM producer.
                                                                                                                                                                                                      		Partitioner PartitionerConstructor
                                                                                                                                                                                                      		// If enabled, the producer will ensure that exactly one copy of each message is
                                                                                                                                                                                                      		// written.
                                                                                                                                                                                                      		Idempotent bool
                                                                                                                                                                                                      
                                                                                                                                                                                                      		// Return specifies what channels will be populated. If they are set to true,
                                                                                                                                                                                                      		// you must read from the respective channels to prevent deadlock. If,
                                                                                                                                                                                                      		// however, this config is used to create a `SyncProducer`, both must be set
                                                                                                                                                                                                      		// to true and you shall not read from the channels since the producer does
                                                                                                                                                                                                      		// this internally.
                                                                                                                                                                                                      		Return struct {
                                                                                                                                                                                                      			// If enabled, successfully delivered messages will be returned on the
                                                                                                                                                                                                      			// Successes channel (default disabled).
                                                                                                                                                                                                      			Successes bool
                                                                                                                                                                                                      
                                                                                                                                                                                                      			// If enabled, messages that failed to deliver will be returned on the
                                                                                                                                                                                                      			// Errors channel, including error (default enabled).
                                                                                                                                                                                                      			Errors bool
                                                                                                                                                                                                      		}
                                                                                                                                                                                                      
                                                                                                                                                                                                      		// The following config options control how often messages are batched up and
                                                                                                                                                                                                      		// sent to the broker. By default, messages are sent as fast as possible, and
                                                                                                                                                                                                      		// all messages received while the current batch is in-flight are placed
                                                                                                                                                                                                      		// into the subsequent batch.
                                                                                                                                                                                                      		Flush struct {
                                                                                                                                                                                                      			// The best-effort number of bytes needed to trigger a flush. Use the
                                                                                                                                                                                                      			// global sarama.MaxRequestSize to set a hard upper limit.
                                                                                                                                                                                                      			Bytes int
                                                                                                                                                                                                      			// The best-effort number of messages needed to trigger a flush. Use
                                                                                                                                                                                                      			// `MaxMessages` to set a hard upper limit.
                                                                                                                                                                                                      			Messages int
                                                                                                                                                                                                      			// The best-effort frequency of flushes. Equivalent to
                                                                                                                                                                                                      			// `queue.buffering.max.ms` setting of JVM producer.
                                                                                                                                                                                                      			Frequency time.Duration
                                                                                                                                                                                                      			// The maximum number of messages the producer will send in a single
                                                                                                                                                                                                      			// broker request. Defaults to 0 for unlimited. Similar to
                                                                                                                                                                                                      			// `queue.buffering.max.messages` in the JVM producer.
                                                                                                                                                                                                      			MaxMessages int
                                                                                                                                                                                                      		}
                                                                                                                                                                                                      
                                                                                                                                                                                                      		Retry struct {
                                                                                                                                                                                                      			// The total number of times to retry sending a message (default 3).
                                                                                                                                                                                                      			// Similar to the `message.send.max.retries` setting of the JVM producer.
                                                                                                                                                                                                      			Max int
                                                                                                                                                                                                      			// How long to wait for the cluster to settle between retries
                                                                                                                                                                                                      			// (default 100ms). Similar to the `retry.backoff.ms` setting of the
                                                                                                                                                                                                      			// JVM producer.
                                                                                                                                                                                                      			Backoff time.Duration
                                                                                                                                                                                                      			// Called to compute backoff time dynamically. Useful for implementing
                                                                                                                                                                                                      			// more sophisticated backoff strategies. This takes precedence over
                                                                                                                                                                                                      			// `Backoff` if set.
                                                                                                                                                                                                      			BackoffFunc func(retries, maxRetries int) time.Duration
                                                                                                                                                                                                      		}
                                                                                                                                                                                                      
                                                                                                                                                                                                      		// Interceptors to be called when the producer dispatcher reads the
                                                                                                                                                                                                      		// message for the first time. Interceptors allows to intercept and
                                                                                                                                                                                                      		// possible mutate the message before they are published to Kafka
                                                                                                                                                                                                      		// cluster. *ProducerMessage modified by the first interceptor's
                                                                                                                                                                                                      		// OnSend() is passed to the second interceptor OnSend(), and so on in
                                                                                                                                                                                                      		// the interceptor chain.
                                                                                                                                                                                                      		Interceptors []ProducerInterceptor
                                                                                                                                                                                                      	}
                                                                                                                                                                                                      
                                                                                                                                                                                                      	// Consumer is the namespace for configuration related to consuming messages,
                                                                                                                                                                                                      	// used by the Consumer.
                                                                                                                                                                                                      	Consumer struct {
                                                                                                                                                                                                      
                                                                                                                                                                                                      		// Group is the namespace for configuring consumer group.
                                                                                                                                                                                                      		Group struct {
                                                                                                                                                                                                      			Session struct {
                                                                                                                                                                                                      				// The timeout used to detect consumer failures when using Kafka's group management facility.
                                                                                                                                                                                                      				// The consumer sends periodic heartbeats to indicate its liveness to the broker.
                                                                                                                                                                                                      				// If no heartbeats are received by the broker before the expiration of this session timeout,
                                                                                                                                                                                                      				// then the broker will remove this consumer from the group and initiate a rebalance.
                                                                                                                                                                                                      				// Note that the value must be in the allowable range as configured in the broker configuration
                                                                                                                                                                                                      				// by `group.min.session.timeout.ms` and `group.max.session.timeout.ms` (default 10s)
                                                                                                                                                                                                      				Timeout time.Duration
                                                                                                                                                                                                      			}
                                                                                                                                                                                                      			Heartbeat struct {
                                                                                                                                                                                                      				// The expected time between heartbeats to the consumer coordinator when using Kafka's group
                                                                                                                                                                                                      				// management facilities. Heartbeats are used to ensure that the consumer's session stays active and
                                                                                                                                                                                                      				// to facilitate rebalancing when new consumers join or leave the group.
                                                                                                                                                                                                      				// The value must be set lower than Consumer.Group.Session.Timeout, but typically should be set no
                                                                                                                                                                                                      				// higher than 1/3 of that value.
                                                                                                                                                                                                      				// It can be adjusted even lower to control the expected time for normal rebalances (default 3s)
                                                                                                                                                                                                      				Interval time.Duration
                                                                                                                                                                                                      			}
                                                                                                                                                                                                      			Rebalance struct {
                                                                                                                                                                                                      				// Strategy for allocating topic partitions to members (default BalanceStrategyRange)
                                                                                                                                                                                                      				Strategy BalanceStrategy
                                                                                                                                                                                                      				// The maximum allowed time for each worker to join the group once a rebalance has begun.
                                                                                                                                                                                                      				// This is basically a limit on the amount of time needed for all tasks to flush any pending
                                                                                                                                                                                                      				// data and commit offsets. If the timeout is exceeded, then the worker will be removed from
                                                                                                                                                                                                      				// the group, which will cause offset commit failures (default 60s).
                                                                                                                                                                                                      				Timeout time.Duration
                                                                                                                                                                                                      
                                                                                                                                                                                                      				Retry struct {
                                                                                                                                                                                                      					// When a new consumer joins a consumer group the set of consumers attempt to "rebalance"
                                                                                                                                                                                                      					// the load to assign partitions to each consumer. If the set of consumers changes while
                                                                                                                                                                                                      					// this assignment is taking place the rebalance will fail and retry. This setting controls
                                                                                                                                                                                                      					// the maximum number of attempts before giving up (default 4).
                                                                                                                                                                                                      					Max int
                                                                                                                                                                                                      					// Backoff time between retries during rebalance (default 2s)
                                                                                                                                                                                                      					Backoff time.Duration
                                                                                                                                                                                                      				}
                                                                                                                                                                                                      			}
                                                                                                                                                                                                      			Member struct {
                                                                                                                                                                                                      				// Custom metadata to include when joining the group. The user data for all joined members
                                                                                                                                                                                                      				// can be retrieved by sending a DescribeGroupRequest to the broker that is the
                                                                                                                                                                                                      				// coordinator for the group.
                                                                                                                                                                                                      				UserData []byte
                                                                                                                                                                                                      			}
                                                                                                                                                                                                      		}
                                                                                                                                                                                                      
                                                                                                                                                                                                      		Retry struct {
                                                                                                                                                                                                      			// How long to wait after a failing to read from a partition before
                                                                                                                                                                                                      			// trying again (default 2s).
                                                                                                                                                                                                      			Backoff time.Duration
                                                                                                                                                                                                      			// Called to compute backoff time dynamically. Useful for implementing
                                                                                                                                                                                                      			// more sophisticated backoff strategies. This takes precedence over
                                                                                                                                                                                                      			// `Backoff` if set.
                                                                                                                                                                                                      			BackoffFunc func(retries int) time.Duration
                                                                                                                                                                                                      		}
                                                                                                                                                                                                      
                                                                                                                                                                                                      		// Fetch is the namespace for controlling how many bytes are retrieved by any
                                                                                                                                                                                                      		// given request.
                                                                                                                                                                                                      		Fetch struct {
                                                                                                                                                                                                      			// The minimum number of message bytes to fetch in a request - the broker
                                                                                                                                                                                                      			// will wait until at least this many are available. The default is 1,
                                                                                                                                                                                                      			// as 0 causes the consumer to spin when no messages are available.
                                                                                                                                                                                                      			// Equivalent to the JVM's `fetch.min.bytes`.
                                                                                                                                                                                                      			Min int32
                                                                                                                                                                                                      			// The default number of message bytes to fetch from the broker in each
                                                                                                                                                                                                      			// request (default 1MB). This should be larger than the majority of
                                                                                                                                                                                                      			// your messages, or else the consumer will spend a lot of time
                                                                                                                                                                                                      			// negotiating sizes and not actually consuming. Similar to the JVM's
                                                                                                                                                                                                      			// `fetch.message.max.bytes`.
                                                                                                                                                                                                      			Default int32
                                                                                                                                                                                                      			// The maximum number of message bytes to fetch from the broker in a
                                                                                                                                                                                                      			// single request. Messages larger than this will return
                                                                                                                                                                                                      			// ErrMessageTooLarge and will not be consumable, so you must be sure
                                                                                                                                                                                                      			// this is at least as large as your largest message. Defaults to 0
                                                                                                                                                                                                      			// (no limit). Similar to the JVM's `fetch.message.max.bytes`. The
                                                                                                                                                                                                      			// global `sarama.MaxResponseSize` still applies.
                                                                                                                                                                                                      			Max int32
                                                                                                                                                                                                      		}
                                                                                                                                                                                                      		// The maximum amount of time the broker will wait for Consumer.Fetch.Min
                                                                                                                                                                                                      		// bytes to become available before it returns fewer than that anyways. The
                                                                                                                                                                                                      		// default is 250ms, since 0 causes the consumer to spin when no events are
                                                                                                                                                                                                      		// available. 100-500ms is a reasonable range for most cases. Kafka only
                                                                                                                                                                                                      		// supports precision up to milliseconds; nanoseconds will be truncated.
                                                                                                                                                                                                      		// Equivalent to the JVM's `fetch.wait.max.ms`.
                                                                                                                                                                                                      		MaxWaitTime time.Duration
                                                                                                                                                                                                      
                                                                                                                                                                                                      		// The maximum amount of time the consumer expects a message takes to
                                                                                                                                                                                                      		// process for the user. If writing to the Messages channel takes longer
                                                                                                                                                                                                      		// than this, that partition will stop fetching more messages until it
                                                                                                                                                                                                      		// can proceed again.
                                                                                                                                                                                                      		// Note that, since the Messages channel is buffered, the actual grace time is
                                                                                                                                                                                                      		// (MaxProcessingTime * ChannelBufferSize). Defaults to 100ms.
                                                                                                                                                                                                      		// If a message is not written to the Messages channel between two ticks
                                                                                                                                                                                                      		// of the expiryTicker then a timeout is detected.
                                                                                                                                                                                                      		// Using a ticker instead of a timer to detect timeouts should typically
                                                                                                                                                                                                      		// result in many fewer calls to Timer functions which may result in a
                                                                                                                                                                                                      		// significant performance improvement if many messages are being sent
                                                                                                                                                                                                      		// and timeouts are infrequent.
                                                                                                                                                                                                      		// The disadvantage of using a ticker instead of a timer is that
                                                                                                                                                                                                      		// timeouts will be less accurate. That is, the effective timeout could
                                                                                                                                                                                                      		// be between `MaxProcessingTime` and `2 * MaxProcessingTime`. For
                                                                                                                                                                                                      		// example, if `MaxProcessingTime` is 100ms then a delay of 180ms
                                                                                                                                                                                                      		// between two messages being sent may not be recognized as a timeout.
                                                                                                                                                                                                      		MaxProcessingTime time.Duration
                                                                                                                                                                                                      
                                                                                                                                                                                                      		// Return specifies what channels will be populated. If they are set to true,
                                                                                                                                                                                                      		// you must read from them to prevent deadlock.
                                                                                                                                                                                                      		Return struct {
                                                                                                                                                                                                      			// If enabled, any errors that occurred while consuming are returned on
                                                                                                                                                                                                      			// the Errors channel (default disabled).
                                                                                                                                                                                                      			Errors bool
                                                                                                                                                                                                      		}
                                                                                                                                                                                                      
                                                                                                                                                                                                      		// Offsets specifies configuration for how and when to commit consumed
                                                                                                                                                                                                      		// offsets. This currently requires the manual use of an OffsetManager
                                                                                                                                                                                                      		// but will eventually be automated.
                                                                                                                                                                                                      		Offsets struct {
                                                                                                                                                                                                      			// Deprecated: CommitInterval exists for historical compatibility
                                                                                                                                                                                                      			// and should not be used. Please use Consumer.Offsets.AutoCommit
                                                                                                                                                                                                      			CommitInterval time.Duration
                                                                                                                                                                                                      
                                                                                                                                                                                                      			// AutoCommit specifies configuration for commit messages automatically.
                                                                                                                                                                                                      			AutoCommit struct {
                                                                                                                                                                                                      				// Whether or not to auto-commit updated offsets back to the broker.
                                                                                                                                                                                                      				// (default enabled).
                                                                                                                                                                                                      				Enable bool
                                                                                                                                                                                                      
                                                                                                                                                                                                      				// How frequently to commit updated offsets. Ineffective unless
                                                                                                                                                                                                      				// auto-commit is enabled (default 1s)
                                                                                                                                                                                                      				Interval time.Duration
                                                                                                                                                                                                      			}
                                                                                                                                                                                                      
                                                                                                                                                                                                      			// The initial offset to use if no offset was previously committed.
                                                                                                                                                                                                      			// Should be OffsetNewest or OffsetOldest. Defaults to OffsetNewest.
                                                                                                                                                                                                      			Initial int64
                                                                                                                                                                                                      
                                                                                                                                                                                                      			// The retention duration for committed offsets. If zero, disabled
                                                                                                                                                                                                      			// (in which case the `offsets.retention.minutes` option on the
                                                                                                                                                                                                      			// broker will be used).  Kafka only supports precision up to
                                                                                                                                                                                                      			// milliseconds; nanoseconds will be truncated. Requires Kafka
                                                                                                                                                                                                      			// broker version 0.9.0 or later.
                                                                                                                                                                                                      			// (default is 0: disabled).
                                                                                                                                                                                                      			Retention time.Duration
                                                                                                                                                                                                      
                                                                                                                                                                                                      			Retry struct {
                                                                                                                                                                                                      				// The total number of times to retry failing commit
                                                                                                                                                                                                      				// requests during OffsetManager shutdown (default 3).
                                                                                                                                                                                                      				Max int
                                                                                                                                                                                                      			}
                                                                                                                                                                                                      		}
                                                                                                                                                                                                      
                                                                                                                                                                                                      		// IsolationLevel support 2 mode:
                                                                                                                                                                                                      		// 	- use `ReadUncommitted` (default) to consume and return all messages in message channel
                                                                                                                                                                                                      		//	- use `ReadCommitted` to hide messages that are part of an aborted transaction
                                                                                                                                                                                                      		IsolationLevel IsolationLevel
                                                                                                                                                                                                      
                                                                                                                                                                                                      		// Interceptors to be called just before the record is sent to the
                                                                                                                                                                                                      		// messages channel. Interceptors allows to intercept and possible
                                                                                                                                                                                                      		// mutate the message before they are returned to the client.
                                                                                                                                                                                                      		// *ConsumerMessage modified by the first interceptor's OnConsume() is
                                                                                                                                                                                                      		// passed to the second interceptor OnConsume(), and so on in the
                                                                                                                                                                                                      		// interceptor chain.
                                                                                                                                                                                                      		Interceptors []ConsumerInterceptor
                                                                                                                                                                                                      	}
                                                                                                                                                                                                      
                                                                                                                                                                                                      	// A user-provided string sent with every request to the brokers for logging,
                                                                                                                                                                                                      	// debugging, and auditing purposes. Defaults to "sarama", but you should
                                                                                                                                                                                                      	// probably set it to something specific to your application.
                                                                                                                                                                                                      	ClientID string
                                                                                                                                                                                                      	// A rack identifier for this client. This can be any string value which
                                                                                                                                                                                                      	// indicates where this client is physically located.
                                                                                                                                                                                                      	// It corresponds with the broker config 'broker.rack'
                                                                                                                                                                                                      	RackID string
                                                                                                                                                                                                      	// The number of events to buffer in internal and external channels. This
                                                                                                                                                                                                      	// permits the producer and consumer to continue processing some messages
                                                                                                                                                                                                      	// in the background while user code is working, greatly improving throughput.
                                                                                                                                                                                                      	// Defaults to 256.
                                                                                                                                                                                                      	ChannelBufferSize int
                                                                                                                                                                                                      	// The version of Kafka that Sarama will assume it is running against.
                                                                                                                                                                                                      	// Defaults to the oldest supported stable version. Since Kafka provides
                                                                                                                                                                                                      	// backwards-compatibility, setting it to a version older than you have
                                                                                                                                                                                                      	// will not break anything, although it may prevent you from using the
                                                                                                                                                                                                      	// latest features. Setting it to a version greater than you are actually
                                                                                                                                                                                                      	// running may lead to random breakage.
                                                                                                                                                                                                      	Version KafkaVersion
                                                                                                                                                                                                      	// The registry to define metrics into.
                                                                                                                                                                                                      	// Defaults to a local registry.
                                                                                                                                                                                                      	// If you want to disable metrics gathering, set "metrics.UseNilMetrics" to "true"
                                                                                                                                                                                                      	// prior to starting Sarama.
                                                                                                                                                                                                      	// See Examples on how to use the metrics registry
                                                                                                                                                                                                      	MetricRegistry metrics.Registry
                                                                                                                                                                                                      }

                                                                                                                                                                                                        Config is used to pass multiple configuration options to Sarama's constructors.

                                                                                                                                                                                                        Example (Metrics)

                                                                                                                                                                                                          This example shows how to integrate with an existing registry as well as publishing metrics on the standard output

                                                                                                                                                                                                          Output:
                                                                                                                                                                                                          
                                                                                                                                                                                                          gauge m1
                                                                                                                                                                                                            value:               1
                                                                                                                                                                                                          gauge sarama.m2
                                                                                                                                                                                                            value:               2
                                                                                                                                                                                                          

                                                                                                                                                                                                          func NewConfig

                                                                                                                                                                                                          func NewConfig() *Config

                                                                                                                                                                                                            NewConfig returns a new configuration instance with sane defaults.

                                                                                                                                                                                                            func (*Config) Validate

                                                                                                                                                                                                            func (c *Config) Validate() error

                                                                                                                                                                                                              Validate checks a Config instance. It will return a ConfigurationError if the specified values don't make sense.

                                                                                                                                                                                                              type ConfigEntry

                                                                                                                                                                                                              type ConfigEntry struct {
                                                                                                                                                                                                              	Name      string
                                                                                                                                                                                                              	Value     string
                                                                                                                                                                                                              	ReadOnly  bool
                                                                                                                                                                                                              	Default   bool
                                                                                                                                                                                                              	Source    ConfigSource
                                                                                                                                                                                                              	Sensitive bool
                                                                                                                                                                                                              	Synonyms  []*ConfigSynonym
                                                                                                                                                                                                              }

                                                                                                                                                                                                              type ConfigResource

                                                                                                                                                                                                              type ConfigResource struct {
                                                                                                                                                                                                              	Type        ConfigResourceType
                                                                                                                                                                                                              	Name        string
                                                                                                                                                                                                              	ConfigNames []string
                                                                                                                                                                                                              }

                                                                                                                                                                                                              type ConfigResourceType

                                                                                                                                                                                                              type ConfigResourceType int8

                                                                                                                                                                                                                ConfigResourceType is a type for resources that have configs.

                                                                                                                                                                                                                const (
                                                                                                                                                                                                                	// UnknownResource constant type
                                                                                                                                                                                                                	UnknownResource ConfigResourceType = 0
                                                                                                                                                                                                                	// TopicResource constant type
                                                                                                                                                                                                                	TopicResource ConfigResourceType = 2
                                                                                                                                                                                                                	// BrokerResource constant type
                                                                                                                                                                                                                	BrokerResource ConfigResourceType = 4
                                                                                                                                                                                                                	// BrokerLoggerResource constant type
                                                                                                                                                                                                                	BrokerLoggerResource ConfigResourceType = 8
                                                                                                                                                                                                                )

                                                                                                                                                                                                                type ConfigSource

                                                                                                                                                                                                                type ConfigSource int8
                                                                                                                                                                                                                const (
                                                                                                                                                                                                                	SourceUnknown ConfigSource = iota
                                                                                                                                                                                                                	SourceTopic
                                                                                                                                                                                                                	SourceDynamicBroker
                                                                                                                                                                                                                	SourceDynamicDefaultBroker
                                                                                                                                                                                                                	SourceStaticBroker
                                                                                                                                                                                                                	SourceDefault
                                                                                                                                                                                                                )

                                                                                                                                                                                                                func (ConfigSource) String

                                                                                                                                                                                                                func (s ConfigSource) String() string

                                                                                                                                                                                                                type ConfigSynonym

                                                                                                                                                                                                                type ConfigSynonym struct {
                                                                                                                                                                                                                	ConfigName  string
                                                                                                                                                                                                                	ConfigValue string
                                                                                                                                                                                                                	Source      ConfigSource
                                                                                                                                                                                                                }

                                                                                                                                                                                                                type ConfigurationError

                                                                                                                                                                                                                type ConfigurationError string

                                                                                                                                                                                                                  ConfigurationError is the type of error returned from a constructor (e.g. NewClient, or NewConsumer) when the specified configuration is invalid.

                                                                                                                                                                                                                  func (ConfigurationError) Error

                                                                                                                                                                                                                  func (err ConfigurationError) Error() string

                                                                                                                                                                                                                  type Consumer

                                                                                                                                                                                                                  type Consumer interface {
                                                                                                                                                                                                                  	// Topics returns the set of available topics as retrieved from the cluster
                                                                                                                                                                                                                  	// metadata. This method is the same as Client.Topics(), and is provided for
                                                                                                                                                                                                                  	// convenience.
                                                                                                                                                                                                                  	Topics() ([]string, error)
                                                                                                                                                                                                                  
                                                                                                                                                                                                                  	// Partitions returns the sorted list of all partition IDs for the given topic.
                                                                                                                                                                                                                  	// This method is the same as Client.Partitions(), and is provided for convenience.
                                                                                                                                                                                                                  	Partitions(topic string) ([]int32, error)
                                                                                                                                                                                                                  
                                                                                                                                                                                                                  	// ConsumePartition creates a PartitionConsumer on the given topic/partition with
                                                                                                                                                                                                                  	// the given offset. It will return an error if this Consumer is already consuming
                                                                                                                                                                                                                  	// on the given topic/partition. Offset can be a literal offset, or OffsetNewest
                                                                                                                                                                                                                  	// or OffsetOldest
                                                                                                                                                                                                                  	ConsumePartition(topic string, partition int32, offset int64) (PartitionConsumer, error)
                                                                                                                                                                                                                  
                                                                                                                                                                                                                  	// HighWaterMarks returns the current high water marks for each topic and partition.
                                                                                                                                                                                                                  	// Consistency between partitions is not guaranteed since high water marks are updated separately.
                                                                                                                                                                                                                  	HighWaterMarks() map[string]map[int32]int64
                                                                                                                                                                                                                  
                                                                                                                                                                                                                  	// Close shuts down the consumer. It must be called after all child
                                                                                                                                                                                                                  	// PartitionConsumers have already been closed.
                                                                                                                                                                                                                  	Close() error
                                                                                                                                                                                                                  }

                                                                                                                                                                                                                    Consumer manages PartitionConsumers which process Kafka messages from brokers. You MUST call Close() on a consumer to avoid leaks, it will not be garbage-collected automatically when it passes out of scope.

                                                                                                                                                                                                                    Example

                                                                                                                                                                                                                      This example shows how to use the consumer to read messages from a single partition.

                                                                                                                                                                                                                      Output:
                                                                                                                                                                                                                      
                                                                                                                                                                                                                      

                                                                                                                                                                                                                      func NewConsumer

                                                                                                                                                                                                                      func NewConsumer(addrs []string, config *Config) (Consumer, error)

                                                                                                                                                                                                                        NewConsumer creates a new consumer using the given broker addresses and configuration.

                                                                                                                                                                                                                        func NewConsumerFromClient

                                                                                                                                                                                                                        func NewConsumerFromClient(client Client) (Consumer, error)

                                                                                                                                                                                                                          NewConsumerFromClient creates a new consumer using the given client. It is still necessary to call Close() on the underlying client when shutting down this consumer.

                                                                                                                                                                                                                          type ConsumerError

                                                                                                                                                                                                                          type ConsumerError struct {
                                                                                                                                                                                                                          	Topic     string
                                                                                                                                                                                                                          	Partition int32
                                                                                                                                                                                                                          	Err       error
                                                                                                                                                                                                                          }

                                                                                                                                                                                                                            ConsumerError is what is provided to the user when an error occurs. It wraps an error and includes the topic and partition.

                                                                                                                                                                                                                            func (ConsumerError) Error

                                                                                                                                                                                                                            func (ce ConsumerError) Error() string

                                                                                                                                                                                                                            func (ConsumerError) Unwrap

                                                                                                                                                                                                                            func (ce ConsumerError) Unwrap() error

                                                                                                                                                                                                                            type ConsumerErrors

                                                                                                                                                                                                                            type ConsumerErrors []*ConsumerError

                                                                                                                                                                                                                              ConsumerErrors is a type that wraps a batch of errors and implements the Error interface. It can be returned from the PartitionConsumer's Close methods to avoid the need to manually drain errors when stopping.

                                                                                                                                                                                                                              func (ConsumerErrors) Error

                                                                                                                                                                                                                              func (ce ConsumerErrors) Error() string

                                                                                                                                                                                                                              type ConsumerGroup

                                                                                                                                                                                                                              type ConsumerGroup interface {
                                                                                                                                                                                                                              	// Consume joins a cluster of consumers for a given list of topics and
                                                                                                                                                                                                                              	// starts a blocking ConsumerGroupSession through the ConsumerGroupHandler.
                                                                                                                                                                                                                              	//
                                                                                                                                                                                                                              	// The life-cycle of a session is represented by the following steps:
                                                                                                                                                                                                                              	//
                                                                                                                                                                                                                              	// 1. The consumers join the group (as explained in https://kafka.apache.org/documentation/#intro_consumers)
                                                                                                                                                                                                                              	//    and is assigned their "fair share" of partitions, aka 'claims'.
                                                                                                                                                                                                                              	// 2. Before processing starts, the handler's Setup() hook is called to notify the user
                                                                                                                                                                                                                              	//    of the claims and allow any necessary preparation or alteration of state.
                                                                                                                                                                                                                              	// 3. For each of the assigned claims the handler's ConsumeClaim() function is then called
                                                                                                                                                                                                                              	//    in a separate goroutine which requires it to be thread-safe. Any state must be carefully protected
                                                                                                                                                                                                                              	//    from concurrent reads/writes.
                                                                                                                                                                                                                              	// 4. The session will persist until one of the ConsumeClaim() functions exits. This can be either when the
                                                                                                                                                                                                                              	//    parent context is cancelled or when a server-side rebalance cycle is initiated.
                                                                                                                                                                                                                              	// 5. Once all the ConsumeClaim() loops have exited, the handler's Cleanup() hook is called
                                                                                                                                                                                                                              	//    to allow the user to perform any final tasks before a rebalance.
                                                                                                                                                                                                                              	// 6. Finally, marked offsets are committed one last time before claims are released.
                                                                                                                                                                                                                              	//
                                                                                                                                                                                                                              	// Please note, that once a rebalance is triggered, sessions must be completed within
                                                                                                                                                                                                                              	// Config.Consumer.Group.Rebalance.Timeout. This means that ConsumeClaim() functions must exit
                                                                                                                                                                                                                              	// as quickly as possible to allow time for Cleanup() and the final offset commit. If the timeout
                                                                                                                                                                                                                              	// is exceeded, the consumer will be removed from the group by Kafka, which will cause offset
                                                                                                                                                                                                                              	// commit failures.
                                                                                                                                                                                                                              	// This method should be called inside an infinite loop, when a
                                                                                                                                                                                                                              	// server-side rebalance happens, the consumer session will need to be
                                                                                                                                                                                                                              	// recreated to get the new claims.
                                                                                                                                                                                                                              	Consume(ctx context.Context, topics []string, handler ConsumerGroupHandler) error
                                                                                                                                                                                                                              
                                                                                                                                                                                                                              	// Errors returns a read channel of errors that occurred during the consumer life-cycle.
                                                                                                                                                                                                                              	// By default, errors are logged and not returned over this channel.
                                                                                                                                                                                                                              	// If you want to implement any custom error handling, set your config's
                                                                                                                                                                                                                              	// Consumer.Return.Errors setting to true, and read from this channel.
                                                                                                                                                                                                                              	Errors() <-chan error
                                                                                                                                                                                                                              
                                                                                                                                                                                                                              	// Close stops the ConsumerGroup and detaches any running sessions. It is required to call
                                                                                                                                                                                                                              	// this function before the object passes out of scope, as it will otherwise leak memory.
                                                                                                                                                                                                                              	Close() error
                                                                                                                                                                                                                              }

                                                                                                                                                                                                                                ConsumerGroup is responsible for dividing up processing of topics and partitions over a collection of processes (the members of the consumer group).

                                                                                                                                                                                                                                Example
                                                                                                                                                                                                                                Output:
                                                                                                                                                                                                                                
                                                                                                                                                                                                                                

                                                                                                                                                                                                                                func NewConsumerGroup

                                                                                                                                                                                                                                func NewConsumerGroup(addrs []string, groupID string, config *Config) (ConsumerGroup, error)

                                                                                                                                                                                                                                  NewConsumerGroup creates a new consumer group the given broker addresses and configuration.

                                                                                                                                                                                                                                  func NewConsumerGroupFromClient

                                                                                                                                                                                                                                  func NewConsumerGroupFromClient(groupID string, client Client) (ConsumerGroup, error)

                                                                                                                                                                                                                                    NewConsumerGroupFromClient creates a new consumer group using the given client. It is still necessary to call Close() on the underlying client when shutting down this consumer. PLEASE NOTE: consumer groups can only re-use but not share clients.

                                                                                                                                                                                                                                    type ConsumerGroupClaim

                                                                                                                                                                                                                                    type ConsumerGroupClaim interface {
                                                                                                                                                                                                                                    	// Topic returns the consumed topic name.
                                                                                                                                                                                                                                    	Topic() string
                                                                                                                                                                                                                                    
                                                                                                                                                                                                                                    	// Partition returns the consumed partition.
                                                                                                                                                                                                                                    	Partition() int32
                                                                                                                                                                                                                                    
                                                                                                                                                                                                                                    	// InitialOffset returns the initial offset that was used as a starting point for this claim.
                                                                                                                                                                                                                                    	InitialOffset() int64
                                                                                                                                                                                                                                    
                                                                                                                                                                                                                                    	// HighWaterMarkOffset returns the high water mark offset of the partition,
                                                                                                                                                                                                                                    	// i.e. the offset that will be used for the next message that will be produced.
                                                                                                                                                                                                                                    	// You can use this to determine how far behind the processing is.
                                                                                                                                                                                                                                    	HighWaterMarkOffset() int64
                                                                                                                                                                                                                                    
                                                                                                                                                                                                                                    	// Messages returns the read channel for the messages that are returned by
                                                                                                                                                                                                                                    	// the broker. The messages channel will be closed when a new rebalance cycle
                                                                                                                                                                                                                                    	// is due. You must finish processing and mark offsets within
                                                                                                                                                                                                                                    	// Config.Consumer.Group.Session.Timeout before the topic/partition is eventually
                                                                                                                                                                                                                                    	// re-assigned to another group member.
                                                                                                                                                                                                                                    	Messages() <-chan *ConsumerMessage
                                                                                                                                                                                                                                    }

                                                                                                                                                                                                                                      ConsumerGroupClaim processes Kafka messages from a given topic and partition within a consumer group.

                                                                                                                                                                                                                                      type ConsumerGroupHandler

                                                                                                                                                                                                                                      type ConsumerGroupHandler interface {
                                                                                                                                                                                                                                      	// Setup is run at the beginning of a new session, before ConsumeClaim.
                                                                                                                                                                                                                                      	Setup(ConsumerGroupSession) error
                                                                                                                                                                                                                                      
                                                                                                                                                                                                                                      	// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
                                                                                                                                                                                                                                      	// but before the offsets are committed for the very last time.
                                                                                                                                                                                                                                      	Cleanup(ConsumerGroupSession) error
                                                                                                                                                                                                                                      
                                                                                                                                                                                                                                      	// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
                                                                                                                                                                                                                                      	// Once the Messages() channel is closed, the Handler must finish its processing
                                                                                                                                                                                                                                      	// loop and exit.
                                                                                                                                                                                                                                      	ConsumeClaim(ConsumerGroupSession, ConsumerGroupClaim) error
                                                                                                                                                                                                                                      }

                                                                                                                                                                                                                                        ConsumerGroupHandler instances are used to handle individual topic/partition claims. It also provides hooks for your consumer group session life-cycle and allow you to trigger logic before or after the consume loop(s).

                                                                                                                                                                                                                                        PLEASE NOTE that handlers are likely be called from several goroutines concurrently, ensure that all state is safely protected against race conditions.

                                                                                                                                                                                                                                        type ConsumerGroupMemberAssignment

                                                                                                                                                                                                                                        type ConsumerGroupMemberAssignment struct {
                                                                                                                                                                                                                                        	Version  int16
                                                                                                                                                                                                                                        	Topics   map[string][]int32
                                                                                                                                                                                                                                        	UserData []byte
                                                                                                                                                                                                                                        }

                                                                                                                                                                                                                                          ConsumerGroupMemberAssignment holds the member assignment for a consume group

                                                                                                                                                                                                                                          type ConsumerGroupMemberMetadata

                                                                                                                                                                                                                                          type ConsumerGroupMemberMetadata struct {
                                                                                                                                                                                                                                          	Version  int16
                                                                                                                                                                                                                                          	Topics   []string
                                                                                                                                                                                                                                          	UserData []byte
                                                                                                                                                                                                                                          }

                                                                                                                                                                                                                                            ConsumerGroupMemberMetadata holds the metadata for consumer group

                                                                                                                                                                                                                                            type ConsumerGroupSession

                                                                                                                                                                                                                                            type ConsumerGroupSession interface {
                                                                                                                                                                                                                                            	// Claims returns information about the claimed partitions by topic.
                                                                                                                                                                                                                                            	Claims() map[string][]int32
                                                                                                                                                                                                                                            
                                                                                                                                                                                                                                            	// MemberID returns the cluster member ID.
                                                                                                                                                                                                                                            	MemberID() string
                                                                                                                                                                                                                                            
                                                                                                                                                                                                                                            	// GenerationID returns the current generation ID.
                                                                                                                                                                                                                                            	GenerationID() int32
                                                                                                                                                                                                                                            
                                                                                                                                                                                                                                            	// MarkOffset marks the provided offset, alongside a metadata string
                                                                                                                                                                                                                                            	// that represents the state of the partition consumer at that point in time. The
                                                                                                                                                                                                                                            	// metadata string can be used by another consumer to restore that state, so it
                                                                                                                                                                                                                                            	// can resume consumption.
                                                                                                                                                                                                                                            	//
                                                                                                                                                                                                                                            	// To follow upstream conventions, you are expected to mark the offset of the
                                                                                                                                                                                                                                            	// next message to read, not the last message read. Thus, when calling `MarkOffset`
                                                                                                                                                                                                                                            	// you should typically add one to the offset of the last consumed message.
                                                                                                                                                                                                                                            	//
                                                                                                                                                                                                                                            	// Note: calling MarkOffset does not necessarily commit the offset to the backend
                                                                                                                                                                                                                                            	// store immediately for efficiency reasons, and it may never be committed if
                                                                                                                                                                                                                                            	// your application crashes. This means that you may end up processing the same
                                                                                                                                                                                                                                            	// message twice, and your processing should ideally be idempotent.
                                                                                                                                                                                                                                            	MarkOffset(topic string, partition int32, offset int64, metadata string)
                                                                                                                                                                                                                                            
                                                                                                                                                                                                                                            	// Commit the offset to the backend
                                                                                                                                                                                                                                            	//
                                                                                                                                                                                                                                            	// Note: calling Commit performs a blocking synchronous operation.
                                                                                                                                                                                                                                            	Commit()
                                                                                                                                                                                                                                            
                                                                                                                                                                                                                                            	// ResetOffset resets to the provided offset, alongside a metadata string that
                                                                                                                                                                                                                                            	// represents the state of the partition consumer at that point in time. Reset
                                                                                                                                                                                                                                            	// acts as a counterpart to MarkOffset, the difference being that it allows to
                                                                                                                                                                                                                                            	// reset an offset to an earlier or smaller value, where MarkOffset only
                                                                                                                                                                                                                                            	// allows incrementing the offset. cf MarkOffset for more details.
                                                                                                                                                                                                                                            	ResetOffset(topic string, partition int32, offset int64, metadata string)
                                                                                                                                                                                                                                            
                                                                                                                                                                                                                                            	// MarkMessage marks a message as consumed.
                                                                                                                                                                                                                                            	MarkMessage(msg *ConsumerMessage, metadata string)
                                                                                                                                                                                                                                            
                                                                                                                                                                                                                                            	// Context returns the session context.
                                                                                                                                                                                                                                            	Context() context.Context
                                                                                                                                                                                                                                            }

                                                                                                                                                                                                                                              ConsumerGroupSession represents a consumer group member session.

                                                                                                                                                                                                                                              type ConsumerInterceptor

                                                                                                                                                                                                                                              type ConsumerInterceptor interface {
                                                                                                                                                                                                                                              
                                                                                                                                                                                                                                              	// OnConsume is called when the consumed message is intercepted. Please
                                                                                                                                                                                                                                              	// avoid modifying the message until it's safe to do so, as this is _not_ a
                                                                                                                                                                                                                                              	// copy of the message.
                                                                                                                                                                                                                                              	OnConsume(*ConsumerMessage)
                                                                                                                                                                                                                                              }

                                                                                                                                                                                                                                                ConsumerInterceptor allows you to intercept (and possibly mutate) the records received by the consumer before they are sent to the messages channel. https://cwiki.apache.org/confluence/display/KAFKA/KIP-42%3A+Add+Producer+and+Consumer+Interceptors#KIP42:AddProducerandConsumerInterceptors-Motivation

                                                                                                                                                                                                                                                type ConsumerMessage

                                                                                                                                                                                                                                                type ConsumerMessage struct {
                                                                                                                                                                                                                                                	Headers        []*RecordHeader // only set if kafka is version 0.11+
                                                                                                                                                                                                                                                	Timestamp      time.Time       // only set if kafka is version 0.10+, inner message timestamp
                                                                                                                                                                                                                                                	BlockTimestamp time.Time       // only set if kafka is version 0.10+, outer (compressed) block timestamp
                                                                                                                                                                                                                                                
                                                                                                                                                                                                                                                	Key, Value []byte
                                                                                                                                                                                                                                                	Topic      string
                                                                                                                                                                                                                                                	Partition  int32
                                                                                                                                                                                                                                                	Offset     int64
                                                                                                                                                                                                                                                }

                                                                                                                                                                                                                                                  ConsumerMessage encapsulates a Kafka message returned by the consumer.

                                                                                                                                                                                                                                                  type ConsumerMetadataRequest

                                                                                                                                                                                                                                                  type ConsumerMetadataRequest struct {
                                                                                                                                                                                                                                                  	ConsumerGroup string
                                                                                                                                                                                                                                                  }

                                                                                                                                                                                                                                                    ConsumerMetadataRequest is used for metadata requests

                                                                                                                                                                                                                                                    type ConsumerMetadataResponse

                                                                                                                                                                                                                                                    type ConsumerMetadataResponse struct {
                                                                                                                                                                                                                                                    	Err             KError
                                                                                                                                                                                                                                                    	Coordinator     *Broker
                                                                                                                                                                                                                                                    	CoordinatorID   int32  // deprecated: use Coordinator.ID()
                                                                                                                                                                                                                                                    	CoordinatorHost string // deprecated: use Coordinator.Addr()
                                                                                                                                                                                                                                                    	CoordinatorPort int32  // deprecated: use Coordinator.Addr()
                                                                                                                                                                                                                                                    }

                                                                                                                                                                                                                                                      ConsumerMetadataResponse holds the response for a consumer group meta data requests

                                                                                                                                                                                                                                                      type ControlRecord

                                                                                                                                                                                                                                                      type ControlRecord struct {
                                                                                                                                                                                                                                                      	Version          int16
                                                                                                                                                                                                                                                      	CoordinatorEpoch int32
                                                                                                                                                                                                                                                      	Type             ControlRecordType
                                                                                                                                                                                                                                                      }

                                                                                                                                                                                                                                                        Control records are returned as a record by fetchRequest However unlike "normal" records, they mean nothing application wise. They only serve internal logic for supporting transactions.

                                                                                                                                                                                                                                                        type ControlRecordType

                                                                                                                                                                                                                                                        type ControlRecordType int

                                                                                                                                                                                                                                                          ControlRecordType ...

                                                                                                                                                                                                                                                          const (
                                                                                                                                                                                                                                                          	//ControlRecordAbort is a control record for abort
                                                                                                                                                                                                                                                          	ControlRecordAbort ControlRecordType = iota
                                                                                                                                                                                                                                                          	//ControlRecordCommit is a control record for commit
                                                                                                                                                                                                                                                          	ControlRecordCommit
                                                                                                                                                                                                                                                          	//ControlRecordUnknown is a control record of unknown type
                                                                                                                                                                                                                                                          	ControlRecordUnknown
                                                                                                                                                                                                                                                          )

                                                                                                                                                                                                                                                          type CoordinatorType

                                                                                                                                                                                                                                                          type CoordinatorType int8
                                                                                                                                                                                                                                                          const (
                                                                                                                                                                                                                                                          	CoordinatorGroup CoordinatorType = iota
                                                                                                                                                                                                                                                          	CoordinatorTransaction
                                                                                                                                                                                                                                                          )

                                                                                                                                                                                                                                                          type CreateAclsRequest

                                                                                                                                                                                                                                                          type CreateAclsRequest struct {
                                                                                                                                                                                                                                                          	Version      int16
                                                                                                                                                                                                                                                          	AclCreations []*AclCreation
                                                                                                                                                                                                                                                          }

                                                                                                                                                                                                                                                            CreateAclsRequest is an acl creation request

                                                                                                                                                                                                                                                            type CreateAclsResponse

                                                                                                                                                                                                                                                            type CreateAclsResponse struct {
                                                                                                                                                                                                                                                            	ThrottleTime         time.Duration
                                                                                                                                                                                                                                                            	AclCreationResponses []*AclCreationResponse
                                                                                                                                                                                                                                                            }

                                                                                                                                                                                                                                                              CreateAclsResponse is a an acl response creation type

                                                                                                                                                                                                                                                              type CreatePartitionsRequest

                                                                                                                                                                                                                                                              type CreatePartitionsRequest struct {
                                                                                                                                                                                                                                                              	TopicPartitions map[string]*TopicPartition
                                                                                                                                                                                                                                                              	Timeout         time.Duration
                                                                                                                                                                                                                                                              	ValidateOnly    bool
                                                                                                                                                                                                                                                              }

                                                                                                                                                                                                                                                              type CreatePartitionsResponse

                                                                                                                                                                                                                                                              type CreatePartitionsResponse struct {
                                                                                                                                                                                                                                                              	ThrottleTime         time.Duration
                                                                                                                                                                                                                                                              	TopicPartitionErrors map[string]*TopicPartitionError
                                                                                                                                                                                                                                                              }

                                                                                                                                                                                                                                                              type CreateTopicsRequest

                                                                                                                                                                                                                                                              type CreateTopicsRequest struct {
                                                                                                                                                                                                                                                              	Version int16
                                                                                                                                                                                                                                                              
                                                                                                                                                                                                                                                              	TopicDetails map[string]*TopicDetail
                                                                                                                                                                                                                                                              	Timeout      time.Duration
                                                                                                                                                                                                                                                              	ValidateOnly bool
                                                                                                                                                                                                                                                              }

                                                                                                                                                                                                                                                              type CreateTopicsResponse

                                                                                                                                                                                                                                                              type CreateTopicsResponse struct {
                                                                                                                                                                                                                                                              	Version      int16
                                                                                                                                                                                                                                                              	ThrottleTime time.Duration
                                                                                                                                                                                                                                                              	TopicErrors  map[string]*TopicError
                                                                                                                                                                                                                                                              }

                                                                                                                                                                                                                                                              type DeleteAclsRequest

                                                                                                                                                                                                                                                              type DeleteAclsRequest struct {
                                                                                                                                                                                                                                                              	Version int
                                                                                                                                                                                                                                                              	Filters []*AclFilter
                                                                                                                                                                                                                                                              }

                                                                                                                                                                                                                                                                DeleteAclsRequest is a delete acl request

                                                                                                                                                                                                                                                                type DeleteAclsResponse

                                                                                                                                                                                                                                                                type DeleteAclsResponse struct {
                                                                                                                                                                                                                                                                	Version         int16
                                                                                                                                                                                                                                                                	ThrottleTime    time.Duration
                                                                                                                                                                                                                                                                	FilterResponses []*FilterResponse
                                                                                                                                                                                                                                                                }

                                                                                                                                                                                                                                                                  DeleteAclsResponse is a delete acl response

                                                                                                                                                                                                                                                                  type DeleteGroupsRequest

                                                                                                                                                                                                                                                                  type DeleteGroupsRequest struct {
                                                                                                                                                                                                                                                                  	Groups []string
                                                                                                                                                                                                                                                                  }

                                                                                                                                                                                                                                                                  func (*DeleteGroupsRequest) AddGroup

                                                                                                                                                                                                                                                                  func (r *DeleteGroupsRequest) AddGroup(group string)

                                                                                                                                                                                                                                                                  type DeleteGroupsResponse

                                                                                                                                                                                                                                                                  type DeleteGroupsResponse struct {
                                                                                                                                                                                                                                                                  	ThrottleTime    time.Duration
                                                                                                                                                                                                                                                                  	GroupErrorCodes map[string]KError
                                                                                                                                                                                                                                                                  }

                                                                                                                                                                                                                                                                  type DeleteRecordsRequest

                                                                                                                                                                                                                                                                  type DeleteRecordsRequest struct {
                                                                                                                                                                                                                                                                  	Topics  map[string]*DeleteRecordsRequestTopic
                                                                                                                                                                                                                                                                  	Timeout time.Duration
                                                                                                                                                                                                                                                                  }

                                                                                                                                                                                                                                                                  type DeleteRecordsRequestTopic

                                                                                                                                                                                                                                                                  type DeleteRecordsRequestTopic struct {
                                                                                                                                                                                                                                                                  	PartitionOffsets map[int32]int64 // partition => offset
                                                                                                                                                                                                                                                                  }

                                                                                                                                                                                                                                                                  type DeleteRecordsResponse

                                                                                                                                                                                                                                                                  type DeleteRecordsResponse struct {
                                                                                                                                                                                                                                                                  	Version      int16
                                                                                                                                                                                                                                                                  	ThrottleTime time.Duration
                                                                                                                                                                                                                                                                  	Topics       map[string]*DeleteRecordsResponseTopic
                                                                                                                                                                                                                                                                  }

                                                                                                                                                                                                                                                                  type DeleteRecordsResponsePartition

                                                                                                                                                                                                                                                                  type DeleteRecordsResponsePartition struct {
                                                                                                                                                                                                                                                                  	LowWatermark int64
                                                                                                                                                                                                                                                                  	Err          KError
                                                                                                                                                                                                                                                                  }

                                                                                                                                                                                                                                                                  type DeleteRecordsResponseTopic

                                                                                                                                                                                                                                                                  type DeleteRecordsResponseTopic struct {
                                                                                                                                                                                                                                                                  	Partitions map[int32]*DeleteRecordsResponsePartition
                                                                                                                                                                                                                                                                  }

                                                                                                                                                                                                                                                                  type DeleteTopicsRequest

                                                                                                                                                                                                                                                                  type DeleteTopicsRequest struct {
                                                                                                                                                                                                                                                                  	Version int16
                                                                                                                                                                                                                                                                  	Topics  []string
                                                                                                                                                                                                                                                                  	Timeout time.Duration
                                                                                                                                                                                                                                                                  }

                                                                                                                                                                                                                                                                  type DeleteTopicsResponse

                                                                                                                                                                                                                                                                  type DeleteTopicsResponse struct {
                                                                                                                                                                                                                                                                  	Version         int16
                                                                                                                                                                                                                                                                  	ThrottleTime    time.Duration
                                                                                                                                                                                                                                                                  	TopicErrorCodes map[string]KError
                                                                                                                                                                                                                                                                  }

                                                                                                                                                                                                                                                                  type DescribeAclsRequest

                                                                                                                                                                                                                                                                  type DescribeAclsRequest struct {
                                                                                                                                                                                                                                                                  	Version int
                                                                                                                                                                                                                                                                  	AclFilter
                                                                                                                                                                                                                                                                  }

                                                                                                                                                                                                                                                                    DescribeAclsRequest is a secribe acl request type

                                                                                                                                                                                                                                                                    type DescribeAclsResponse

                                                                                                                                                                                                                                                                    type DescribeAclsResponse struct {
                                                                                                                                                                                                                                                                    	Version      int16
                                                                                                                                                                                                                                                                    	ThrottleTime time.Duration
                                                                                                                                                                                                                                                                    	Err          KError
                                                                                                                                                                                                                                                                    	ErrMsg       *string
                                                                                                                                                                                                                                                                    	ResourceAcls []*ResourceAcls
                                                                                                                                                                                                                                                                    }

                                                                                                                                                                                                                                                                      DescribeAclsResponse is a describe acl response type

                                                                                                                                                                                                                                                                      type DescribeConfigsRequest

                                                                                                                                                                                                                                                                      type DescribeConfigsRequest struct {
                                                                                                                                                                                                                                                                      	Version         int16
                                                                                                                                                                                                                                                                      	Resources       []*ConfigResource
                                                                                                                                                                                                                                                                      	IncludeSynonyms bool
                                                                                                                                                                                                                                                                      }

                                                                                                                                                                                                                                                                      type DescribeConfigsResponse

                                                                                                                                                                                                                                                                      type DescribeConfigsResponse struct {
                                                                                                                                                                                                                                                                      	Version      int16
                                                                                                                                                                                                                                                                      	ThrottleTime time.Duration
                                                                                                                                                                                                                                                                      	Resources    []*ResourceResponse
                                                                                                                                                                                                                                                                      }

                                                                                                                                                                                                                                                                      type DescribeGroupsRequest

                                                                                                                                                                                                                                                                      type DescribeGroupsRequest struct {
                                                                                                                                                                                                                                                                      	Groups []string
                                                                                                                                                                                                                                                                      }

                                                                                                                                                                                                                                                                      func (*DescribeGroupsRequest) AddGroup

                                                                                                                                                                                                                                                                      func (r *DescribeGroupsRequest) AddGroup(group string)

                                                                                                                                                                                                                                                                      type DescribeGroupsResponse

                                                                                                                                                                                                                                                                      type DescribeGroupsResponse struct {
                                                                                                                                                                                                                                                                      	Groups []*GroupDescription
                                                                                                                                                                                                                                                                      }

                                                                                                                                                                                                                                                                      type DescribeLogDirsRequest

                                                                                                                                                                                                                                                                      type DescribeLogDirsRequest struct {
                                                                                                                                                                                                                                                                      	// Version 0 and 1 are equal
                                                                                                                                                                                                                                                                      	// The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
                                                                                                                                                                                                                                                                      	Version int16
                                                                                                                                                                                                                                                                      
                                                                                                                                                                                                                                                                      	// If this is an empty array, all topics will be queried
                                                                                                                                                                                                                                                                      	DescribeTopics []DescribeLogDirsRequestTopic
                                                                                                                                                                                                                                                                      }

                                                                                                                                                                                                                                                                        DescribeLogDirsRequest is a describe request to get partitions' log size

                                                                                                                                                                                                                                                                        type DescribeLogDirsRequestTopic

                                                                                                                                                                                                                                                                        type DescribeLogDirsRequestTopic struct {
                                                                                                                                                                                                                                                                        	Topic        string
                                                                                                                                                                                                                                                                        	PartitionIDs []int32
                                                                                                                                                                                                                                                                        }

                                                                                                                                                                                                                                                                          DescribeLogDirsRequestTopic is a describe request about the log dir of one or more partitions within a Topic

                                                                                                                                                                                                                                                                          type DescribeLogDirsResponse

                                                                                                                                                                                                                                                                          type DescribeLogDirsResponse struct {
                                                                                                                                                                                                                                                                          	ThrottleTime time.Duration
                                                                                                                                                                                                                                                                          
                                                                                                                                                                                                                                                                          	// Version 0 and 1 are equal
                                                                                                                                                                                                                                                                          	// The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
                                                                                                                                                                                                                                                                          	Version int16
                                                                                                                                                                                                                                                                          
                                                                                                                                                                                                                                                                          	LogDirs []DescribeLogDirsResponseDirMetadata
                                                                                                                                                                                                                                                                          }

                                                                                                                                                                                                                                                                          type DescribeLogDirsResponseDirMetadata

                                                                                                                                                                                                                                                                          type DescribeLogDirsResponseDirMetadata struct {
                                                                                                                                                                                                                                                                          	ErrorCode KError
                                                                                                                                                                                                                                                                          
                                                                                                                                                                                                                                                                          	// The absolute log directory path
                                                                                                                                                                                                                                                                          	Path   string
                                                                                                                                                                                                                                                                          	Topics []DescribeLogDirsResponseTopic
                                                                                                                                                                                                                                                                          }

                                                                                                                                                                                                                                                                          type DescribeLogDirsResponsePartition

                                                                                                                                                                                                                                                                          type DescribeLogDirsResponsePartition struct {
                                                                                                                                                                                                                                                                          	PartitionID int32
                                                                                                                                                                                                                                                                          
                                                                                                                                                                                                                                                                          	// The size of the log segments of the partition in bytes.
                                                                                                                                                                                                                                                                          	Size int64
                                                                                                                                                                                                                                                                          
                                                                                                                                                                                                                                                                          	// The lag of the log's LEO w.r.t. partition's HW (if it is the current log for the partition) or
                                                                                                                                                                                                                                                                          	// current replica's LEO (if it is the future log for the partition)
                                                                                                                                                                                                                                                                          	OffsetLag int64
                                                                                                                                                                                                                                                                          
                                                                                                                                                                                                                                                                          	// True if this log is created by AlterReplicaLogDirsRequest and will replace the current log of
                                                                                                                                                                                                                                                                          	// the replica in the future.
                                                                                                                                                                                                                                                                          	IsTemporary bool
                                                                                                                                                                                                                                                                          }

                                                                                                                                                                                                                                                                            DescribeLogDirsResponsePartition describes a partition's log directory

                                                                                                                                                                                                                                                                            type DescribeLogDirsResponseTopic

                                                                                                                                                                                                                                                                            type DescribeLogDirsResponseTopic struct {
                                                                                                                                                                                                                                                                            	Topic      string
                                                                                                                                                                                                                                                                            	Partitions []DescribeLogDirsResponsePartition
                                                                                                                                                                                                                                                                            }

                                                                                                                                                                                                                                                                              DescribeLogDirsResponseTopic contains a topic's partitions descriptions

                                                                                                                                                                                                                                                                              type DynamicConsistencyPartitioner

                                                                                                                                                                                                                                                                              type DynamicConsistencyPartitioner interface {
                                                                                                                                                                                                                                                                              	Partitioner
                                                                                                                                                                                                                                                                              
                                                                                                                                                                                                                                                                              	// MessageRequiresConsistency is similar to Partitioner.RequiresConsistency,
                                                                                                                                                                                                                                                                              	// but takes in the message being partitioned so that the partitioner can
                                                                                                                                                                                                                                                                              	// make a per-message determination.
                                                                                                                                                                                                                                                                              	MessageRequiresConsistency(message *ProducerMessage) bool
                                                                                                                                                                                                                                                                              }

                                                                                                                                                                                                                                                                                DynamicConsistencyPartitioner can optionally be implemented by Partitioners in order to allow more flexibility than is originally allowed by the RequiresConsistency method in the Partitioner interface. This allows partitioners to require consistency sometimes, but not all times. It's useful for, e.g., the HashPartitioner, which does not require consistency if the message key is nil.

                                                                                                                                                                                                                                                                                type Encoder

                                                                                                                                                                                                                                                                                type Encoder interface {
                                                                                                                                                                                                                                                                                	Encode() ([]byte, error)
                                                                                                                                                                                                                                                                                	Length() int
                                                                                                                                                                                                                                                                                }

                                                                                                                                                                                                                                                                                  Encoder is a simple interface for any type that can be encoded as an array of bytes in order to be sent as the key or value of a Kafka message. Length() is provided as an optimization, and must return the same as len() on the result of Encode().

                                                                                                                                                                                                                                                                                  type EndTxnRequest

                                                                                                                                                                                                                                                                                  type EndTxnRequest struct {
                                                                                                                                                                                                                                                                                  	TransactionalID   string
                                                                                                                                                                                                                                                                                  	ProducerID        int64
                                                                                                                                                                                                                                                                                  	ProducerEpoch     int16
                                                                                                                                                                                                                                                                                  	TransactionResult bool
                                                                                                                                                                                                                                                                                  }

                                                                                                                                                                                                                                                                                  type EndTxnResponse

                                                                                                                                                                                                                                                                                  type EndTxnResponse struct {
                                                                                                                                                                                                                                                                                  	ThrottleTime time.Duration
                                                                                                                                                                                                                                                                                  	Err          KError
                                                                                                                                                                                                                                                                                  }

                                                                                                                                                                                                                                                                                  type ErrDeleteRecords

                                                                                                                                                                                                                                                                                  type ErrDeleteRecords struct {
                                                                                                                                                                                                                                                                                  	MultiError
                                                                                                                                                                                                                                                                                  }

                                                                                                                                                                                                                                                                                    ErrDeleteRecords is the type of error returned when fail to delete the required records

                                                                                                                                                                                                                                                                                    func (ErrDeleteRecords) Error

                                                                                                                                                                                                                                                                                    func (err ErrDeleteRecords) Error() string

                                                                                                                                                                                                                                                                                    type ErrReassignPartitions

                                                                                                                                                                                                                                                                                    type ErrReassignPartitions struct {
                                                                                                                                                                                                                                                                                    	MultiError
                                                                                                                                                                                                                                                                                    }

                                                                                                                                                                                                                                                                                    func (ErrReassignPartitions) Error

                                                                                                                                                                                                                                                                                    func (err ErrReassignPartitions) Error() string

                                                                                                                                                                                                                                                                                    type FetchRequest

                                                                                                                                                                                                                                                                                    type FetchRequest struct {
                                                                                                                                                                                                                                                                                    	MaxWaitTime  int32
                                                                                                                                                                                                                                                                                    	MinBytes     int32
                                                                                                                                                                                                                                                                                    	MaxBytes     int32
                                                                                                                                                                                                                                                                                    	Version      int16
                                                                                                                                                                                                                                                                                    	Isolation    IsolationLevel
                                                                                                                                                                                                                                                                                    	SessionID    int32
                                                                                                                                                                                                                                                                                    	SessionEpoch int32
                                                                                                                                                                                                                                                                                    
                                                                                                                                                                                                                                                                                    	RackID string
                                                                                                                                                                                                                                                                                    	// contains filtered or unexported fields
                                                                                                                                                                                                                                                                                    }

                                                                                                                                                                                                                                                                                      FetchRequest (API key 1) will fetch Kafka messages. Version 3 introduced the MaxBytes field. See https://issues.apache.org/jira/browse/KAFKA-2063 for a discussion of the issues leading up to that. The KIP is at https://cwiki.apache.org/confluence/display/KAFKA/KIP-74%3A+Add+Fetch+Response+Size+Limit+in+Bytes

                                                                                                                                                                                                                                                                                      func (*FetchRequest) AddBlock

                                                                                                                                                                                                                                                                                      func (r *FetchRequest) AddBlock(topic string, partitionID int32, fetchOffset int64, maxBytes int32)

                                                                                                                                                                                                                                                                                      type FetchResponse

                                                                                                                                                                                                                                                                                      type FetchResponse struct {
                                                                                                                                                                                                                                                                                      	Blocks        map[string]map[int32]*FetchResponseBlock
                                                                                                                                                                                                                                                                                      	ThrottleTime  time.Duration
                                                                                                                                                                                                                                                                                      	ErrorCode     int16
                                                                                                                                                                                                                                                                                      	SessionID     int32
                                                                                                                                                                                                                                                                                      	Version       int16
                                                                                                                                                                                                                                                                                      	LogAppendTime bool
                                                                                                                                                                                                                                                                                      	Timestamp     time.Time
                                                                                                                                                                                                                                                                                      }

                                                                                                                                                                                                                                                                                      func (*FetchResponse) AddControlRecord

                                                                                                                                                                                                                                                                                      func (r *FetchResponse) AddControlRecord(topic string, partition int32, offset int64, producerID int64, recordType ControlRecordType)

                                                                                                                                                                                                                                                                                      func (*FetchResponse) AddControlRecordWithTimestamp

                                                                                                                                                                                                                                                                                      func (r *FetchResponse) AddControlRecordWithTimestamp(topic string, partition int32, offset int64, producerID int64, recordType ControlRecordType, timestamp time.Time)

                                                                                                                                                                                                                                                                                      func (*FetchResponse) AddError

                                                                                                                                                                                                                                                                                      func (r *FetchResponse) AddError(topic string, partition int32, err KError)

                                                                                                                                                                                                                                                                                      func (*FetchResponse) AddMessage

                                                                                                                                                                                                                                                                                      func (r *FetchResponse) AddMessage(topic string, partition int32, key, value Encoder, offset int64)

                                                                                                                                                                                                                                                                                      func (*FetchResponse) AddMessageWithTimestamp

                                                                                                                                                                                                                                                                                      func (r *FetchResponse) AddMessageWithTimestamp(topic string, partition int32, key, value Encoder, offset int64, timestamp time.Time, version int8)

                                                                                                                                                                                                                                                                                      func (*FetchResponse) AddRecord

                                                                                                                                                                                                                                                                                      func (r *FetchResponse) AddRecord(topic string, partition int32, key, value Encoder, offset int64)

                                                                                                                                                                                                                                                                                      func (*FetchResponse) AddRecordBatch

                                                                                                                                                                                                                                                                                      func (r *FetchResponse) AddRecordBatch(topic string, partition int32, key, value Encoder, offset int64, producerID int64, isTransactional bool)

                                                                                                                                                                                                                                                                                      func (*FetchResponse) AddRecordBatchWithTimestamp

                                                                                                                                                                                                                                                                                      func (r *FetchResponse) AddRecordBatchWithTimestamp(topic string, partition int32, key, value Encoder, offset int64, producerID int64, isTransactional bool, timestamp time.Time)

                                                                                                                                                                                                                                                                                        AddRecordBatchWithTimestamp is similar to AddRecordWithTimestamp But instead of appending 1 record to a batch, it append a new batch containing 1 record to the fetchResponse Since transaction are handled on batch level (the whole batch is either committed or aborted), use this to test transactions

                                                                                                                                                                                                                                                                                        func (*FetchResponse) AddRecordWithTimestamp

                                                                                                                                                                                                                                                                                        func (r *FetchResponse) AddRecordWithTimestamp(topic string, partition int32, key, value Encoder, offset int64, timestamp time.Time)

                                                                                                                                                                                                                                                                                        func (*FetchResponse) GetBlock

                                                                                                                                                                                                                                                                                        func (r *FetchResponse) GetBlock(topic string, partition int32) *FetchResponseBlock

                                                                                                                                                                                                                                                                                        func (*FetchResponse) SetLastOffsetDelta

                                                                                                                                                                                                                                                                                        func (r *FetchResponse) SetLastOffsetDelta(topic string, partition int32, offset int32)

                                                                                                                                                                                                                                                                                        func (*FetchResponse) SetLastStableOffset

                                                                                                                                                                                                                                                                                        func (r *FetchResponse) SetLastStableOffset(topic string, partition int32, offset int64)

                                                                                                                                                                                                                                                                                        type FetchResponseBlock

                                                                                                                                                                                                                                                                                        type FetchResponseBlock struct {
                                                                                                                                                                                                                                                                                        	Err                  KError
                                                                                                                                                                                                                                                                                        	HighWaterMarkOffset  int64
                                                                                                                                                                                                                                                                                        	LastStableOffset     int64
                                                                                                                                                                                                                                                                                        	LogStartOffset       int64
                                                                                                                                                                                                                                                                                        	AbortedTransactions  []*AbortedTransaction
                                                                                                                                                                                                                                                                                        	PreferredReadReplica int32
                                                                                                                                                                                                                                                                                        	Records              *Records // deprecated: use FetchResponseBlock.RecordsSet
                                                                                                                                                                                                                                                                                        	RecordsSet           []*Records
                                                                                                                                                                                                                                                                                        	Partial              bool
                                                                                                                                                                                                                                                                                        }

                                                                                                                                                                                                                                                                                        type FilterResponse

                                                                                                                                                                                                                                                                                        type FilterResponse struct {
                                                                                                                                                                                                                                                                                        	Err          KError
                                                                                                                                                                                                                                                                                        	ErrMsg       *string
                                                                                                                                                                                                                                                                                        	MatchingAcls []*MatchingAcl
                                                                                                                                                                                                                                                                                        }

                                                                                                                                                                                                                                                                                          FilterResponse is a filter response type

                                                                                                                                                                                                                                                                                          type FindCoordinatorRequest

                                                                                                                                                                                                                                                                                          type FindCoordinatorRequest struct {
                                                                                                                                                                                                                                                                                          	Version         int16
                                                                                                                                                                                                                                                                                          	CoordinatorKey  string
                                                                                                                                                                                                                                                                                          	CoordinatorType CoordinatorType
                                                                                                                                                                                                                                                                                          }

                                                                                                                                                                                                                                                                                          type FindCoordinatorResponse

                                                                                                                                                                                                                                                                                          type FindCoordinatorResponse struct {
                                                                                                                                                                                                                                                                                          	Version      int16
                                                                                                                                                                                                                                                                                          	ThrottleTime time.Duration
                                                                                                                                                                                                                                                                                          	Err          KError
                                                                                                                                                                                                                                                                                          	ErrMsg       *string
                                                                                                                                                                                                                                                                                          	Coordinator  *Broker
                                                                                                                                                                                                                                                                                          }

                                                                                                                                                                                                                                                                                          type GSSAPIConfig

                                                                                                                                                                                                                                                                                          type GSSAPIConfig struct {
                                                                                                                                                                                                                                                                                          	AuthType           int
                                                                                                                                                                                                                                                                                          	KeyTabPath         string
                                                                                                                                                                                                                                                                                          	KerberosConfigPath string
                                                                                                                                                                                                                                                                                          	ServiceName        string
                                                                                                                                                                                                                                                                                          	Username           string
                                                                                                                                                                                                                                                                                          	Password           string
                                                                                                                                                                                                                                                                                          	Realm              string
                                                                                                                                                                                                                                                                                          	DisablePAFXFAST    bool
                                                                                                                                                                                                                                                                                          }

                                                                                                                                                                                                                                                                                          type GSSAPIKerberosAuth

                                                                                                                                                                                                                                                                                          type GSSAPIKerberosAuth struct {
                                                                                                                                                                                                                                                                                          	Config *GSSAPIConfig
                                                                                                                                                                                                                                                                                          
                                                                                                                                                                                                                                                                                          	NewKerberosClientFunc func(config *GSSAPIConfig) (KerberosClient, error)
                                                                                                                                                                                                                                                                                          	// contains filtered or unexported fields
                                                                                                                                                                                                                                                                                          }

                                                                                                                                                                                                                                                                                          func (*GSSAPIKerberosAuth) Authorize

                                                                                                                                                                                                                                                                                          func (krbAuth *GSSAPIKerberosAuth) Authorize(broker *Broker) error

                                                                                                                                                                                                                                                                                            This does the handshake for authorization

                                                                                                                                                                                                                                                                                            type GSSApiHandlerFunc

                                                                                                                                                                                                                                                                                            type GSSApiHandlerFunc func([]byte) []byte

                                                                                                                                                                                                                                                                                            type GroupDescription

                                                                                                                                                                                                                                                                                            type GroupDescription struct {
                                                                                                                                                                                                                                                                                            	Err          KError
                                                                                                                                                                                                                                                                                            	GroupId      string
                                                                                                                                                                                                                                                                                            	State        string
                                                                                                                                                                                                                                                                                            	ProtocolType string
                                                                                                                                                                                                                                                                                            	Protocol     string
                                                                                                                                                                                                                                                                                            	Members      map[string]*GroupMemberDescription
                                                                                                                                                                                                                                                                                            }

                                                                                                                                                                                                                                                                                            type GroupMemberDescription

                                                                                                                                                                                                                                                                                            type GroupMemberDescription struct {
                                                                                                                                                                                                                                                                                            	ClientId         string
                                                                                                                                                                                                                                                                                            	ClientHost       string
                                                                                                                                                                                                                                                                                            	MemberMetadata   []byte
                                                                                                                                                                                                                                                                                            	MemberAssignment []byte
                                                                                                                                                                                                                                                                                            }

                                                                                                                                                                                                                                                                                            func (*GroupMemberDescription) GetMemberAssignment

                                                                                                                                                                                                                                                                                            func (gmd *GroupMemberDescription) GetMemberAssignment() (*ConsumerGroupMemberAssignment, error)

                                                                                                                                                                                                                                                                                            func (*GroupMemberDescription) GetMemberMetadata

                                                                                                                                                                                                                                                                                            func (gmd *GroupMemberDescription) GetMemberMetadata() (*ConsumerGroupMemberMetadata, error)

                                                                                                                                                                                                                                                                                            type GroupProtocol

                                                                                                                                                                                                                                                                                            type GroupProtocol struct {
                                                                                                                                                                                                                                                                                            	Name     string
                                                                                                                                                                                                                                                                                            	Metadata []byte
                                                                                                                                                                                                                                                                                            }

                                                                                                                                                                                                                                                                                            type HashPartitionerOption

                                                                                                                                                                                                                                                                                            type HashPartitionerOption func(*hashPartitioner)

                                                                                                                                                                                                                                                                                              HashPartitionOption lets you modify default values of the partitioner

                                                                                                                                                                                                                                                                                              func WithAbsFirst

                                                                                                                                                                                                                                                                                              func WithAbsFirst() HashPartitionerOption

                                                                                                                                                                                                                                                                                                WithAbsFirst means that the partitioner handles absolute values in the same way as the reference Java implementation

                                                                                                                                                                                                                                                                                                func WithCustomFallbackPartitioner

                                                                                                                                                                                                                                                                                                func WithCustomFallbackPartitioner(randomHP *hashPartitioner) HashPartitionerOption

                                                                                                                                                                                                                                                                                                  WithCustomFallbackPartitioner lets you specify what HashPartitioner should be used in case a Distribution Key is empty

                                                                                                                                                                                                                                                                                                  func WithCustomHashFunction

                                                                                                                                                                                                                                                                                                  func WithCustomHashFunction(hasher func() hash.Hash32) HashPartitionerOption

                                                                                                                                                                                                                                                                                                    WithCustomHashFunction lets you specify what hash function to use for the partitioning

                                                                                                                                                                                                                                                                                                    type HeartbeatRequest

                                                                                                                                                                                                                                                                                                    type HeartbeatRequest struct {
                                                                                                                                                                                                                                                                                                    	GroupId      string
                                                                                                                                                                                                                                                                                                    	GenerationId int32
                                                                                                                                                                                                                                                                                                    	MemberId     string
                                                                                                                                                                                                                                                                                                    }

                                                                                                                                                                                                                                                                                                    type HeartbeatResponse

                                                                                                                                                                                                                                                                                                    type HeartbeatResponse struct {
                                                                                                                                                                                                                                                                                                    	Err KError
                                                                                                                                                                                                                                                                                                    }

                                                                                                                                                                                                                                                                                                    type InitProducerIDRequest

                                                                                                                                                                                                                                                                                                    type InitProducerIDRequest struct {
                                                                                                                                                                                                                                                                                                    	TransactionalID    *string
                                                                                                                                                                                                                                                                                                    	TransactionTimeout time.Duration
                                                                                                                                                                                                                                                                                                    }

                                                                                                                                                                                                                                                                                                    type InitProducerIDResponse

                                                                                                                                                                                                                                                                                                    type InitProducerIDResponse struct {
                                                                                                                                                                                                                                                                                                    	ThrottleTime  time.Duration
                                                                                                                                                                                                                                                                                                    	Err           KError
                                                                                                                                                                                                                                                                                                    	ProducerID    int64
                                                                                                                                                                                                                                                                                                    	ProducerEpoch int16
                                                                                                                                                                                                                                                                                                    }

                                                                                                                                                                                                                                                                                                    type IsolationLevel

                                                                                                                                                                                                                                                                                                    type IsolationLevel int8
                                                                                                                                                                                                                                                                                                    const (
                                                                                                                                                                                                                                                                                                    	ReadUncommitted IsolationLevel = iota
                                                                                                                                                                                                                                                                                                    	ReadCommitted
                                                                                                                                                                                                                                                                                                    )

                                                                                                                                                                                                                                                                                                    type JoinGroupRequest

                                                                                                                                                                                                                                                                                                    type JoinGroupRequest struct {
                                                                                                                                                                                                                                                                                                    	Version               int16
                                                                                                                                                                                                                                                                                                    	GroupId               string
                                                                                                                                                                                                                                                                                                    	SessionTimeout        int32
                                                                                                                                                                                                                                                                                                    	RebalanceTimeout      int32
                                                                                                                                                                                                                                                                                                    	MemberId              string
                                                                                                                                                                                                                                                                                                    	ProtocolType          string
                                                                                                                                                                                                                                                                                                    	GroupProtocols        map[string][]byte // deprecated; use OrderedGroupProtocols
                                                                                                                                                                                                                                                                                                    	OrderedGroupProtocols []*GroupProtocol
                                                                                                                                                                                                                                                                                                    }

                                                                                                                                                                                                                                                                                                    func (*JoinGroupRequest) AddGroupProtocol

                                                                                                                                                                                                                                                                                                    func (r *JoinGroupRequest) AddGroupProtocol(name string, metadata []byte)

                                                                                                                                                                                                                                                                                                    func (*JoinGroupRequest) AddGroupProtocolMetadata

                                                                                                                                                                                                                                                                                                    func (r *JoinGroupRequest) AddGroupProtocolMetadata(name string, metadata *ConsumerGroupMemberMetadata) error

                                                                                                                                                                                                                                                                                                    type JoinGroupResponse

                                                                                                                                                                                                                                                                                                    type JoinGroupResponse struct {
                                                                                                                                                                                                                                                                                                    	Version       int16
                                                                                                                                                                                                                                                                                                    	ThrottleTime  int32
                                                                                                                                                                                                                                                                                                    	Err           KError
                                                                                                                                                                                                                                                                                                    	GenerationId  int32
                                                                                                                                                                                                                                                                                                    	GroupProtocol string
                                                                                                                                                                                                                                                                                                    	LeaderId      string
                                                                                                                                                                                                                                                                                                    	MemberId      string
                                                                                                                                                                                                                                                                                                    	Members       map[string][]byte
                                                                                                                                                                                                                                                                                                    }

                                                                                                                                                                                                                                                                                                    func (*JoinGroupResponse) GetMembers

                                                                                                                                                                                                                                                                                                    type KError

                                                                                                                                                                                                                                                                                                    type KError int16

                                                                                                                                                                                                                                                                                                      KError is the type of error that can be returned directly by the Kafka broker. See https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ErrorCodes

                                                                                                                                                                                                                                                                                                      const (
                                                                                                                                                                                                                                                                                                      	ErrNoError                            KError = 0
                                                                                                                                                                                                                                                                                                      	ErrUnknown                            KError = -1
                                                                                                                                                                                                                                                                                                      	ErrOffsetOutOfRange                   KError = 1
                                                                                                                                                                                                                                                                                                      	ErrInvalidMessage                     KError = 2
                                                                                                                                                                                                                                                                                                      	ErrUnknownTopicOrPartition            KError = 3
                                                                                                                                                                                                                                                                                                      	ErrInvalidMessageSize                 KError = 4
                                                                                                                                                                                                                                                                                                      	ErrLeaderNotAvailable                 KError = 5
                                                                                                                                                                                                                                                                                                      	ErrNotLeaderForPartition              KError = 6
                                                                                                                                                                                                                                                                                                      	ErrRequestTimedOut                    KError = 7
                                                                                                                                                                                                                                                                                                      	ErrBrokerNotAvailable                 KError = 8
                                                                                                                                                                                                                                                                                                      	ErrReplicaNotAvailable                KError = 9
                                                                                                                                                                                                                                                                                                      	ErrMessageSizeTooLarge                KError = 10
                                                                                                                                                                                                                                                                                                      	ErrStaleControllerEpochCode           KError = 11
                                                                                                                                                                                                                                                                                                      	ErrOffsetMetadataTooLarge             KError = 12
                                                                                                                                                                                                                                                                                                      	ErrNetworkException                   KError = 13
                                                                                                                                                                                                                                                                                                      	ErrOffsetsLoadInProgress              KError = 14
                                                                                                                                                                                                                                                                                                      	ErrConsumerCoordinatorNotAvailable    KError = 15
                                                                                                                                                                                                                                                                                                      	ErrNotCoordinatorForConsumer          KError = 16
                                                                                                                                                                                                                                                                                                      	ErrInvalidTopic                       KError = 17
                                                                                                                                                                                                                                                                                                      	ErrMessageSetSizeTooLarge             KError = 18
                                                                                                                                                                                                                                                                                                      	ErrNotEnoughReplicas                  KError = 19
                                                                                                                                                                                                                                                                                                      	ErrNotEnoughReplicasAfterAppend       KError = 20
                                                                                                                                                                                                                                                                                                      	ErrInvalidRequiredAcks                KError = 21
                                                                                                                                                                                                                                                                                                      	ErrIllegalGeneration                  KError = 22
                                                                                                                                                                                                                                                                                                      	ErrInconsistentGroupProtocol          KError = 23
                                                                                                                                                                                                                                                                                                      	ErrInvalidGroupId                     KError = 24
                                                                                                                                                                                                                                                                                                      	ErrUnknownMemberId                    KError = 25
                                                                                                                                                                                                                                                                                                      	ErrInvalidSessionTimeout              KError = 26
                                                                                                                                                                                                                                                                                                      	ErrRebalanceInProgress                KError = 27
                                                                                                                                                                                                                                                                                                      	ErrInvalidCommitOffsetSize            KError = 28
                                                                                                                                                                                                                                                                                                      	ErrTopicAuthorizationFailed           KError = 29
                                                                                                                                                                                                                                                                                                      	ErrGroupAuthorizationFailed           KError = 30
                                                                                                                                                                                                                                                                                                      	ErrClusterAuthorizationFailed         KError = 31
                                                                                                                                                                                                                                                                                                      	ErrInvalidTimestamp                   KError = 32
                                                                                                                                                                                                                                                                                                      	ErrUnsupportedSASLMechanism           KError = 33
                                                                                                                                                                                                                                                                                                      	ErrIllegalSASLState                   KError = 34
                                                                                                                                                                                                                                                                                                      	ErrUnsupportedVersion                 KError = 35
                                                                                                                                                                                                                                                                                                      	ErrTopicAlreadyExists                 KError = 36
                                                                                                                                                                                                                                                                                                      	ErrInvalidPartitions                  KError = 37
                                                                                                                                                                                                                                                                                                      	ErrInvalidReplicationFactor           KError = 38
                                                                                                                                                                                                                                                                                                      	ErrInvalidReplicaAssignment           KError = 39
                                                                                                                                                                                                                                                                                                      	ErrInvalidConfig                      KError = 40
                                                                                                                                                                                                                                                                                                      	ErrNotController                      KError = 41
                                                                                                                                                                                                                                                                                                      	ErrInvalidRequest                     KError = 42
                                                                                                                                                                                                                                                                                                      	ErrUnsupportedForMessageFormat        KError = 43
                                                                                                                                                                                                                                                                                                      	ErrPolicyViolation                    KError = 44
                                                                                                                                                                                                                                                                                                      	ErrOutOfOrderSequenceNumber           KError = 45
                                                                                                                                                                                                                                                                                                      	ErrDuplicateSequenceNumber            KError = 46
                                                                                                                                                                                                                                                                                                      	ErrInvalidProducerEpoch               KError = 47
                                                                                                                                                                                                                                                                                                      	ErrInvalidTxnState                    KError = 48
                                                                                                                                                                                                                                                                                                      	ErrInvalidProducerIDMapping           KError = 49
                                                                                                                                                                                                                                                                                                      	ErrInvalidTransactionTimeout          KError = 50
                                                                                                                                                                                                                                                                                                      	ErrConcurrentTransactions             KError = 51
                                                                                                                                                                                                                                                                                                      	ErrTransactionCoordinatorFenced       KError = 52
                                                                                                                                                                                                                                                                                                      	ErrTransactionalIDAuthorizationFailed KError = 53
                                                                                                                                                                                                                                                                                                      	ErrSecurityDisabled                   KError = 54
                                                                                                                                                                                                                                                                                                      	ErrOperationNotAttempted              KError = 55
                                                                                                                                                                                                                                                                                                      	ErrKafkaStorageError                  KError = 56
                                                                                                                                                                                                                                                                                                      	ErrLogDirNotFound                     KError = 57
                                                                                                                                                                                                                                                                                                      	ErrSASLAuthenticationFailed           KError = 58
                                                                                                                                                                                                                                                                                                      	ErrUnknownProducerID                  KError = 59
                                                                                                                                                                                                                                                                                                      	ErrReassignmentInProgress             KError = 60
                                                                                                                                                                                                                                                                                                      	ErrDelegationTokenAuthDisabled        KError = 61
                                                                                                                                                                                                                                                                                                      	ErrDelegationTokenNotFound            KError = 62
                                                                                                                                                                                                                                                                                                      	ErrDelegationTokenOwnerMismatch       KError = 63
                                                                                                                                                                                                                                                                                                      	ErrDelegationTokenRequestNotAllowed   KError = 64
                                                                                                                                                                                                                                                                                                      	ErrDelegationTokenAuthorizationFailed KError = 65
                                                                                                                                                                                                                                                                                                      	ErrDelegationTokenExpired             KError = 66
                                                                                                                                                                                                                                                                                                      	ErrInvalidPrincipalType               KError = 67
                                                                                                                                                                                                                                                                                                      	ErrNonEmptyGroup                      KError = 68
                                                                                                                                                                                                                                                                                                      	ErrGroupIDNotFound                    KError = 69
                                                                                                                                                                                                                                                                                                      	ErrFetchSessionIDNotFound             KError = 70
                                                                                                                                                                                                                                                                                                      	ErrInvalidFetchSessionEpoch           KError = 71
                                                                                                                                                                                                                                                                                                      	ErrListenerNotFound                   KError = 72
                                                                                                                                                                                                                                                                                                      	ErrTopicDeletionDisabled              KError = 73
                                                                                                                                                                                                                                                                                                      	ErrFencedLeaderEpoch                  KError = 74
                                                                                                                                                                                                                                                                                                      	ErrUnknownLeaderEpoch                 KError = 75
                                                                                                                                                                                                                                                                                                      	ErrUnsupportedCompressionType         KError = 76
                                                                                                                                                                                                                                                                                                      	ErrStaleBrokerEpoch                   KError = 77
                                                                                                                                                                                                                                                                                                      	ErrOffsetNotAvailable                 KError = 78
                                                                                                                                                                                                                                                                                                      	ErrMemberIdRequired                   KError = 79
                                                                                                                                                                                                                                                                                                      	ErrPreferredLeaderNotAvailable        KError = 80
                                                                                                                                                                                                                                                                                                      	ErrGroupMaxSizeReached                KError = 81
                                                                                                                                                                                                                                                                                                      	ErrFencedInstancedId                  KError = 82
                                                                                                                                                                                                                                                                                                      )

                                                                                                                                                                                                                                                                                                        Numeric error codes returned by the Kafka server.

                                                                                                                                                                                                                                                                                                        func (KError) Error

                                                                                                                                                                                                                                                                                                        func (err KError) Error() string

                                                                                                                                                                                                                                                                                                        type KafkaGSSAPIHandler

                                                                                                                                                                                                                                                                                                        type KafkaGSSAPIHandler struct {
                                                                                                                                                                                                                                                                                                        	// contains filtered or unexported fields
                                                                                                                                                                                                                                                                                                        }

                                                                                                                                                                                                                                                                                                        func (*KafkaGSSAPIHandler) MockKafkaGSSAPI

                                                                                                                                                                                                                                                                                                        func (h *KafkaGSSAPIHandler) MockKafkaGSSAPI(buffer []byte) []byte

                                                                                                                                                                                                                                                                                                        type KafkaVersion

                                                                                                                                                                                                                                                                                                        type KafkaVersion struct {
                                                                                                                                                                                                                                                                                                        	// contains filtered or unexported fields
                                                                                                                                                                                                                                                                                                        }

                                                                                                                                                                                                                                                                                                          KafkaVersion instances represent versions of the upstream Kafka broker.

                                                                                                                                                                                                                                                                                                          func ParseKafkaVersion

                                                                                                                                                                                                                                                                                                          func ParseKafkaVersion(s string) (KafkaVersion, error)

                                                                                                                                                                                                                                                                                                            ParseKafkaVersion parses and returns kafka version or error from a string

                                                                                                                                                                                                                                                                                                            func (KafkaVersion) IsAtLeast

                                                                                                                                                                                                                                                                                                            func (v KafkaVersion) IsAtLeast(other KafkaVersion) bool

                                                                                                                                                                                                                                                                                                              IsAtLeast return true if and only if the version it is called on is greater than or equal to the version passed in:

                                                                                                                                                                                                                                                                                                              V1.IsAtLeast(V2) // false
                                                                                                                                                                                                                                                                                                              V2.IsAtLeast(V1) // true
                                                                                                                                                                                                                                                                                                              

                                                                                                                                                                                                                                                                                                              func (KafkaVersion) String

                                                                                                                                                                                                                                                                                                              func (v KafkaVersion) String() string

                                                                                                                                                                                                                                                                                                              type KerberosClient

                                                                                                                                                                                                                                                                                                              type KerberosClient interface {
                                                                                                                                                                                                                                                                                                              	Login() error
                                                                                                                                                                                                                                                                                                              	GetServiceTicket(spn string) (messages.Ticket, types.EncryptionKey, error)
                                                                                                                                                                                                                                                                                                              	Domain() string
                                                                                                                                                                                                                                                                                                              	CName() types.PrincipalName
                                                                                                                                                                                                                                                                                                              	Destroy()
                                                                                                                                                                                                                                                                                                              }

                                                                                                                                                                                                                                                                                                              func NewKerberosClient

                                                                                                                                                                                                                                                                                                              func NewKerberosClient(config *GSSAPIConfig) (KerberosClient, error)

                                                                                                                                                                                                                                                                                                                NewKerberosClient creates kerberos client used to obtain TGT and TGS tokens. It uses pure go Kerberos 5 solution (RFC-4121 and RFC-4120). uses gokrb5 library underlying which is a pure go kerberos client with some GSS-API capabilities.

                                                                                                                                                                                                                                                                                                                type KerberosGoKrb5Client

                                                                                                                                                                                                                                                                                                                type KerberosGoKrb5Client struct {
                                                                                                                                                                                                                                                                                                                	krb5client.Client
                                                                                                                                                                                                                                                                                                                }

                                                                                                                                                                                                                                                                                                                func (*KerberosGoKrb5Client) CName

                                                                                                                                                                                                                                                                                                                func (*KerberosGoKrb5Client) Domain

                                                                                                                                                                                                                                                                                                                func (c *KerberosGoKrb5Client) Domain() string

                                                                                                                                                                                                                                                                                                                type LeaveGroupRequest

                                                                                                                                                                                                                                                                                                                type LeaveGroupRequest struct {
                                                                                                                                                                                                                                                                                                                	GroupId  string
                                                                                                                                                                                                                                                                                                                	MemberId string
                                                                                                                                                                                                                                                                                                                }

                                                                                                                                                                                                                                                                                                                type LeaveGroupResponse

                                                                                                                                                                                                                                                                                                                type LeaveGroupResponse struct {
                                                                                                                                                                                                                                                                                                                	Err KError
                                                                                                                                                                                                                                                                                                                }

                                                                                                                                                                                                                                                                                                                type ListGroupsRequest

                                                                                                                                                                                                                                                                                                                type ListGroupsRequest struct {
                                                                                                                                                                                                                                                                                                                }

                                                                                                                                                                                                                                                                                                                type ListGroupsResponse

                                                                                                                                                                                                                                                                                                                type ListGroupsResponse struct {
                                                                                                                                                                                                                                                                                                                	Err    KError
                                                                                                                                                                                                                                                                                                                	Groups map[string]string
                                                                                                                                                                                                                                                                                                                }

                                                                                                                                                                                                                                                                                                                type ListPartitionReassignmentsRequest

                                                                                                                                                                                                                                                                                                                type ListPartitionReassignmentsRequest struct {
                                                                                                                                                                                                                                                                                                                	TimeoutMs int32
                                                                                                                                                                                                                                                                                                                
                                                                                                                                                                                                                                                                                                                	Version int16
                                                                                                                                                                                                                                                                                                                	// contains filtered or unexported fields
                                                                                                                                                                                                                                                                                                                }

                                                                                                                                                                                                                                                                                                                func (*ListPartitionReassignmentsRequest) AddBlock

                                                                                                                                                                                                                                                                                                                func (r *ListPartitionReassignmentsRequest) AddBlock(topic string, partitionIDs []int32)

                                                                                                                                                                                                                                                                                                                type ListPartitionReassignmentsResponse

                                                                                                                                                                                                                                                                                                                type ListPartitionReassignmentsResponse struct {
                                                                                                                                                                                                                                                                                                                	Version        int16
                                                                                                                                                                                                                                                                                                                	ThrottleTimeMs int32
                                                                                                                                                                                                                                                                                                                	ErrorCode      KError
                                                                                                                                                                                                                                                                                                                	ErrorMessage   *string
                                                                                                                                                                                                                                                                                                                	TopicStatus    map[string]map[int32]*PartitionReplicaReassignmentsStatus
                                                                                                                                                                                                                                                                                                                }

                                                                                                                                                                                                                                                                                                                func (*ListPartitionReassignmentsResponse) AddBlock

                                                                                                                                                                                                                                                                                                                func (r *ListPartitionReassignmentsResponse) AddBlock(topic string, partition int32, replicas, addingReplicas, removingReplicas []int32)

                                                                                                                                                                                                                                                                                                                type MatchingAcl

                                                                                                                                                                                                                                                                                                                type MatchingAcl struct {
                                                                                                                                                                                                                                                                                                                	Err    KError
                                                                                                                                                                                                                                                                                                                	ErrMsg *string
                                                                                                                                                                                                                                                                                                                	Resource
                                                                                                                                                                                                                                                                                                                	Acl
                                                                                                                                                                                                                                                                                                                }

                                                                                                                                                                                                                                                                                                                  MatchingAcl is a matching acl type

                                                                                                                                                                                                                                                                                                                  type Message

                                                                                                                                                                                                                                                                                                                  type Message struct {
                                                                                                                                                                                                                                                                                                                  	Codec            CompressionCodec // codec used to compress the message contents
                                                                                                                                                                                                                                                                                                                  	CompressionLevel int              // compression level
                                                                                                                                                                                                                                                                                                                  	LogAppendTime    bool             // the used timestamp is LogAppendTime
                                                                                                                                                                                                                                                                                                                  	Key              []byte           // the message key, may be nil
                                                                                                                                                                                                                                                                                                                  	Value            []byte           // the message contents
                                                                                                                                                                                                                                                                                                                  	Set              *MessageSet      // the message set a message might wrap
                                                                                                                                                                                                                                                                                                                  	Version          int8             // v1 requires Kafka 0.10
                                                                                                                                                                                                                                                                                                                  	Timestamp        time.Time        // the timestamp of the message (version 1+ only)
                                                                                                                                                                                                                                                                                                                  	// contains filtered or unexported fields
                                                                                                                                                                                                                                                                                                                  }

                                                                                                                                                                                                                                                                                                                    Message is a kafka message type

                                                                                                                                                                                                                                                                                                                    type MessageBlock

                                                                                                                                                                                                                                                                                                                    type MessageBlock struct {
                                                                                                                                                                                                                                                                                                                    	Offset int64
                                                                                                                                                                                                                                                                                                                    	Msg    *Message
                                                                                                                                                                                                                                                                                                                    }

                                                                                                                                                                                                                                                                                                                    func (*MessageBlock) Messages

                                                                                                                                                                                                                                                                                                                    func (msb *MessageBlock) Messages() []*MessageBlock

                                                                                                                                                                                                                                                                                                                      Messages convenience helper which returns either all the messages that are wrapped in this block

                                                                                                                                                                                                                                                                                                                      type MessageSet

                                                                                                                                                                                                                                                                                                                      type MessageSet struct {
                                                                                                                                                                                                                                                                                                                      	PartialTrailingMessage bool // whether the set on the wire contained an incomplete trailing MessageBlock
                                                                                                                                                                                                                                                                                                                      	OverflowMessage        bool // whether the set on the wire contained an overflow message
                                                                                                                                                                                                                                                                                                                      	Messages               []*MessageBlock
                                                                                                                                                                                                                                                                                                                      }

                                                                                                                                                                                                                                                                                                                      type MetadataRequest

                                                                                                                                                                                                                                                                                                                      type MetadataRequest struct {
                                                                                                                                                                                                                                                                                                                      	Version                int16
                                                                                                                                                                                                                                                                                                                      	Topics                 []string
                                                                                                                                                                                                                                                                                                                      	AllowAutoTopicCreation bool
                                                                                                                                                                                                                                                                                                                      }

                                                                                                                                                                                                                                                                                                                      type MetadataResponse

                                                                                                                                                                                                                                                                                                                      type MetadataResponse struct {
                                                                                                                                                                                                                                                                                                                      	Version        int16
                                                                                                                                                                                                                                                                                                                      	ThrottleTimeMs int32
                                                                                                                                                                                                                                                                                                                      	Brokers        []*Broker
                                                                                                                                                                                                                                                                                                                      	ClusterID      *string
                                                                                                                                                                                                                                                                                                                      	ControllerID   int32
                                                                                                                                                                                                                                                                                                                      	Topics         []*TopicMetadata
                                                                                                                                                                                                                                                                                                                      }

                                                                                                                                                                                                                                                                                                                      func (*MetadataResponse) AddBroker

                                                                                                                                                                                                                                                                                                                      func (r *MetadataResponse) AddBroker(addr string, id int32)

                                                                                                                                                                                                                                                                                                                      func (*MetadataResponse) AddTopic

                                                                                                                                                                                                                                                                                                                      func (r *MetadataResponse) AddTopic(topic string, err KError) *TopicMetadata

                                                                                                                                                                                                                                                                                                                      func (*MetadataResponse) AddTopicPartition

                                                                                                                                                                                                                                                                                                                      func (r *MetadataResponse) AddTopicPartition(topic string, partition, brokerID int32, replicas, isr []int32, offline []int32, err KError)

                                                                                                                                                                                                                                                                                                                      type MockAlterConfigsResponse

                                                                                                                                                                                                                                                                                                                      type MockAlterConfigsResponse struct {
                                                                                                                                                                                                                                                                                                                      	// contains filtered or unexported fields
                                                                                                                                                                                                                                                                                                                      }

                                                                                                                                                                                                                                                                                                                      func NewMockAlterConfigsResponse

                                                                                                                                                                                                                                                                                                                      func NewMockAlterConfigsResponse(t TestReporter) *MockAlterConfigsResponse

                                                                                                                                                                                                                                                                                                                      func (*MockAlterConfigsResponse) For

                                                                                                                                                                                                                                                                                                                      func (mr *MockAlterConfigsResponse) For(reqBody versionedDecoder) encoderWithHeader

                                                                                                                                                                                                                                                                                                                      type MockAlterConfigsResponseWithErrorCode

                                                                                                                                                                                                                                                                                                                      type MockAlterConfigsResponseWithErrorCode struct {
                                                                                                                                                                                                                                                                                                                      	// contains filtered or unexported fields
                                                                                                                                                                                                                                                                                                                      }

                                                                                                                                                                                                                                                                                                                      func (*MockAlterConfigsResponseWithErrorCode) For

                                                                                                                                                                                                                                                                                                                      func (mr *MockAlterConfigsResponseWithErrorCode) For(reqBody versionedDecoder) encoderWithHeader

                                                                                                                                                                                                                                                                                                                      type MockAlterPartitionReassignmentsResponse

                                                                                                                                                                                                                                                                                                                      type MockAlterPartitionReassignmentsResponse struct {
                                                                                                                                                                                                                                                                                                                      	// contains filtered or unexported fields
                                                                                                                                                                                                                                                                                                                      }

                                                                                                                                                                                                                                                                                                                      func (*MockAlterPartitionReassignmentsResponse) For

                                                                                                                                                                                                                                                                                                                      func (mr *MockAlterPartitionReassignmentsResponse) For(reqBody versionedDecoder) encoderWithHeader

                                                                                                                                                                                                                                                                                                                      type MockBroker

                                                                                                                                                                                                                                                                                                                      type MockBroker struct {
                                                                                                                                                                                                                                                                                                                      	// contains filtered or unexported fields
                                                                                                                                                                                                                                                                                                                      }

                                                                                                                                                                                                                                                                                                                        MockBroker is a mock Kafka broker that is used in unit tests. It is exposed to facilitate testing of higher level or specialized consumers and producers built on top of Sarama. Note that it does not 'mimic' the Kafka API protocol, but rather provides a facility to do that. It takes care of the TCP transport, request unmarshaling, response marshaling, and makes it the test writer responsibility to program correct according to the Kafka API protocol MockBroker behaviour.

                                                                                                                                                                                                                                                                                                                        MockBroker is implemented as a TCP server listening on a kernel-selected localhost port that can accept many connections. It reads Kafka requests from that connection and returns responses programmed by the SetHandlerByMap function. If a MockBroker receives a request that it has no programmed response for, then it returns nothing and the request times out.

                                                                                                                                                                                                                                                                                                                        A set of MockRequest builders to define mappings used by MockBroker is provided by Sarama. But users can develop MockRequests of their own and use them along with or instead of the standard ones.

                                                                                                                                                                                                                                                                                                                        When running tests with MockBroker it is strongly recommended to specify a timeout to `go test` so that if the broker hangs waiting for a response, the test panics.

                                                                                                                                                                                                                                                                                                                        It is not necessary to prefix message length or correlation ID to your response bytes, the server does that automatically as a convenience.

                                                                                                                                                                                                                                                                                                                        func NewMockBroker

                                                                                                                                                                                                                                                                                                                        func NewMockBroker(t TestReporter, brokerID int32) *MockBroker

                                                                                                                                                                                                                                                                                                                          NewMockBroker launches a fake Kafka broker. It takes a TestReporter as provided by the test framework and a channel of responses to use. If an error occurs it is simply logged to the TestReporter and the broker exits.

                                                                                                                                                                                                                                                                                                                          func NewMockBrokerAddr

                                                                                                                                                                                                                                                                                                                          func NewMockBrokerAddr(t TestReporter, brokerID int32, addr string) *MockBroker

                                                                                                                                                                                                                                                                                                                            NewMockBrokerAddr behaves like newMockBroker but listens on the address you give it rather than just some ephemeral port.

                                                                                                                                                                                                                                                                                                                            func NewMockBrokerListener

                                                                                                                                                                                                                                                                                                                            func NewMockBrokerListener(t TestReporter, brokerID int32, listener net.Listener) *MockBroker

                                                                                                                                                                                                                                                                                                                              NewMockBrokerListener behaves like newMockBrokerAddr but accepts connections on the listener specified.

                                                                                                                                                                                                                                                                                                                              func (*MockBroker) Addr

                                                                                                                                                                                                                                                                                                                              func (b *MockBroker) Addr() string

                                                                                                                                                                                                                                                                                                                                Addr returns the broker connection string in the form "<address>:<port>".

                                                                                                                                                                                                                                                                                                                                func (*MockBroker) BrokerID

                                                                                                                                                                                                                                                                                                                                func (b *MockBroker) BrokerID() int32

                                                                                                                                                                                                                                                                                                                                  BrokerID returns broker ID assigned to the broker.

                                                                                                                                                                                                                                                                                                                                  func (*MockBroker) Close

                                                                                                                                                                                                                                                                                                                                  func (b *MockBroker) Close()

                                                                                                                                                                                                                                                                                                                                    Close terminates the broker blocking until it stops internal goroutines and releases all resources.

                                                                                                                                                                                                                                                                                                                                    func (*MockBroker) History

                                                                                                                                                                                                                                                                                                                                    func (b *MockBroker) History() []RequestResponse

                                                                                                                                                                                                                                                                                                                                      History returns a slice of RequestResponse pairs in the order they were processed by the broker. Note that in case of multiple connections to the broker the order expected by a test can be different from the order recorded in the history, unless some synchronization is implemented in the test.

                                                                                                                                                                                                                                                                                                                                      func (*MockBroker) Port

                                                                                                                                                                                                                                                                                                                                      func (b *MockBroker) Port() int32

                                                                                                                                                                                                                                                                                                                                        Port returns the TCP port number the broker is listening for requests on.

                                                                                                                                                                                                                                                                                                                                        func (*MockBroker) Returns

                                                                                                                                                                                                                                                                                                                                        func (b *MockBroker) Returns(e encoderWithHeader)

                                                                                                                                                                                                                                                                                                                                        func (*MockBroker) SetGSSAPIHandler

                                                                                                                                                                                                                                                                                                                                        func (b *MockBroker) SetGSSAPIHandler(handler GSSApiHandlerFunc)

                                                                                                                                                                                                                                                                                                                                        func (*MockBroker) SetHandlerByMap

                                                                                                                                                                                                                                                                                                                                        func (b *MockBroker) SetHandlerByMap(handlerMap map[string]MockResponse)

                                                                                                                                                                                                                                                                                                                                          SetHandlerByMap defines mapping of Request types to MockResponses. When a request is received by the broker, it looks up the request type in the map and uses the found MockResponse instance to generate an appropriate reply. If the request type is not found in the map then nothing is sent.

                                                                                                                                                                                                                                                                                                                                          func (*MockBroker) SetLatency

                                                                                                                                                                                                                                                                                                                                          func (b *MockBroker) SetLatency(latency time.Duration)

                                                                                                                                                                                                                                                                                                                                            SetLatency makes broker pause for the specified period every time before replying.

                                                                                                                                                                                                                                                                                                                                            func (*MockBroker) SetNotifier

                                                                                                                                                                                                                                                                                                                                            func (b *MockBroker) SetNotifier(notifier RequestNotifierFunc)

                                                                                                                                                                                                                                                                                                                                              SetNotifier set a function that will get invoked whenever a request has been processed successfully and will provide the number of bytes read and written

                                                                                                                                                                                                                                                                                                                                              type MockConsumerMetadataResponse

                                                                                                                                                                                                                                                                                                                                              type MockConsumerMetadataResponse struct {
                                                                                                                                                                                                                                                                                                                                              	// contains filtered or unexported fields
                                                                                                                                                                                                                                                                                                                                              }

                                                                                                                                                                                                                                                                                                                                                MockConsumerMetadataResponse is a `ConsumerMetadataResponse` builder.

                                                                                                                                                                                                                                                                                                                                                func NewMockConsumerMetadataResponse

                                                                                                                                                                                                                                                                                                                                                func NewMockConsumerMetadataResponse(t TestReporter) *MockConsumerMetadataResponse

                                                                                                                                                                                                                                                                                                                                                func (*MockConsumerMetadataResponse) For

                                                                                                                                                                                                                                                                                                                                                func (mr *MockConsumerMetadataResponse) For(reqBody versionedDecoder) encoderWithHeader

                                                                                                                                                                                                                                                                                                                                                func (*MockConsumerMetadataResponse) SetCoordinator

                                                                                                                                                                                                                                                                                                                                                func (mr *MockConsumerMetadataResponse) SetCoordinator(group string, broker *MockBroker) *MockConsumerMetadataResponse

                                                                                                                                                                                                                                                                                                                                                func (*MockConsumerMetadataResponse) SetError

                                                                                                                                                                                                                                                                                                                                                type MockCreateAclsResponse

                                                                                                                                                                                                                                                                                                                                                type MockCreateAclsResponse struct {
                                                                                                                                                                                                                                                                                                                                                	// contains filtered or unexported fields
                                                                                                                                                                                                                                                                                                                                                }

                                                                                                                                                                                                                                                                                                                                                func NewMockCreateAclsResponse

                                                                                                                                                                                                                                                                                                                                                func NewMockCreateAclsResponse(t TestReporter) *MockCreateAclsResponse

                                                                                                                                                                                                                                                                                                                                                func (*MockCreateAclsResponse) For

                                                                                                                                                                                                                                                                                                                                                func (mr *MockCreateAclsResponse) For(reqBody versionedDecoder) encoderWithHeader

                                                                                                                                                                                                                                                                                                                                                type MockCreatePartitionsResponse

                                                                                                                                                                                                                                                                                                                                                type MockCreatePartitionsResponse struct {
                                                                                                                                                                                                                                                                                                                                                	// contains filtered or unexported fields
                                                                                                                                                                                                                                                                                                                                                }

                                                                                                                                                                                                                                                                                                                                                func NewMockCreatePartitionsResponse

                                                                                                                                                                                                                                                                                                                                                func NewMockCreatePartitionsResponse(t TestReporter) *MockCreatePartitionsResponse

                                                                                                                                                                                                                                                                                                                                                func (*MockCreatePartitionsResponse) For

                                                                                                                                                                                                                                                                                                                                                func (mr *MockCreatePartitionsResponse) For(reqBody versionedDecoder) encoderWithHeader

                                                                                                                                                                                                                                                                                                                                                type MockCreateTopicsResponse

                                                                                                                                                                                                                                                                                                                                                type MockCreateTopicsResponse struct {
                                                                                                                                                                                                                                                                                                                                                	// contains filtered or unexported fields
                                                                                                                                                                                                                                                                                                                                                }

                                                                                                                                                                                                                                                                                                                                                func NewMockCreateTopicsResponse

                                                                                                                                                                                                                                                                                                                                                func NewMockCreateTopicsResponse(t TestReporter) *MockCreateTopicsResponse

                                                                                                                                                                                                                                                                                                                                                func (*MockCreateTopicsResponse) For

                                                                                                                                                                                                                                                                                                                                                func (mr *MockCreateTopicsResponse) For(reqBody versionedDecoder) encoderWithHeader

                                                                                                                                                                                                                                                                                                                                                type MockDeleteAclsResponse

                                                                                                                                                                                                                                                                                                                                                type MockDeleteAclsResponse struct {
                                                                                                                                                                                                                                                                                                                                                	// contains filtered or unexported fields
                                                                                                                                                                                                                                                                                                                                                }

                                                                                                                                                                                                                                                                                                                                                func NewMockDeleteAclsResponse

                                                                                                                                                                                                                                                                                                                                                func NewMockDeleteAclsResponse(t TestReporter) *MockDeleteAclsResponse

                                                                                                                                                                                                                                                                                                                                                func (*MockDeleteAclsResponse) For

                                                                                                                                                                                                                                                                                                                                                func (mr *MockDeleteAclsResponse) For(reqBody versionedDecoder) encoderWithHeader

                                                                                                                                                                                                                                                                                                                                                type MockDeleteGroupsResponse

                                                                                                                                                                                                                                                                                                                                                type MockDeleteGroupsResponse struct {
                                                                                                                                                                                                                                                                                                                                                	// contains filtered or unexported fields
                                                                                                                                                                                                                                                                                                                                                }

                                                                                                                                                                                                                                                                                                                                                func NewMockDeleteGroupsRequest

                                                                                                                                                                                                                                                                                                                                                func NewMockDeleteGroupsRequest(t TestReporter) *MockDeleteGroupsResponse

                                                                                                                                                                                                                                                                                                                                                func (*MockDeleteGroupsResponse) For

                                                                                                                                                                                                                                                                                                                                                func (m *MockDeleteGroupsResponse) For(reqBody versionedDecoder) encoderWithHeader

                                                                                                                                                                                                                                                                                                                                                func (*MockDeleteGroupsResponse) SetDeletedGroups

                                                                                                                                                                                                                                                                                                                                                func (m *MockDeleteGroupsResponse) SetDeletedGroups(groups []string) *MockDeleteGroupsResponse

                                                                                                                                                                                                                                                                                                                                                type MockDeleteRecordsResponse

                                                                                                                                                                                                                                                                                                                                                type MockDeleteRecordsResponse struct {
                                                                                                                                                                                                                                                                                                                                                	// contains filtered or unexported fields
                                                                                                                                                                                                                                                                                                                                                }

                                                                                                                                                                                                                                                                                                                                                func NewMockDeleteRecordsResponse

                                                                                                                                                                                                                                                                                                                                                func NewMockDeleteRecordsResponse(t TestReporter) *MockDeleteRecordsResponse

                                                                                                                                                                                                                                                                                                                                                func (*MockDeleteRecordsResponse) For

                                                                                                                                                                                                                                                                                                                                                func (mr *MockDeleteRecordsResponse) For(reqBody versionedDecoder) encoderWithHeader

                                                                                                                                                                                                                                                                                                                                                type MockDeleteTopicsResponse

                                                                                                                                                                                                                                                                                                                                                type MockDeleteTopicsResponse struct {
                                                                                                                                                                                                                                                                                                                                                	// contains filtered or unexported fields
                                                                                                                                                                                                                                                                                                                                                }

                                                                                                                                                                                                                                                                                                                                                func NewMockDeleteTopicsResponse

                                                                                                                                                                                                                                                                                                                                                func NewMockDeleteTopicsResponse(t TestReporter) *MockDeleteTopicsResponse

                                                                                                                                                                                                                                                                                                                                                func (*MockDeleteTopicsResponse) For

                                                                                                                                                                                                                                                                                                                                                func (mr *MockDeleteTopicsResponse) For(reqBody versionedDecoder) encoderWithHeader

                                                                                                                                                                                                                                                                                                                                                type MockDescribeConfigsResponse

                                                                                                                                                                                                                                                                                                                                                type MockDescribeConfigsResponse struct {
                                                                                                                                                                                                                                                                                                                                                	// contains filtered or unexported fields
                                                                                                                                                                                                                                                                                                                                                }

                                                                                                                                                                                                                                                                                                                                                func NewMockDescribeConfigsResponse

                                                                                                                                                                                                                                                                                                                                                func NewMockDescribeConfigsResponse(t TestReporter) *MockDescribeConfigsResponse

                                                                                                                                                                                                                                                                                                                                                func (*MockDescribeConfigsResponse) For

                                                                                                                                                                                                                                                                                                                                                func (mr *MockDescribeConfigsResponse) For(reqBody versionedDecoder) encoderWithHeader

                                                                                                                                                                                                                                                                                                                                                type MockDescribeConfigsResponseWithErrorCode

                                                                                                                                                                                                                                                                                                                                                type MockDescribeConfigsResponseWithErrorCode struct {
                                                                                                                                                                                                                                                                                                                                                	// contains filtered or unexported fields
                                                                                                                                                                                                                                                                                                                                                }

                                                                                                                                                                                                                                                                                                                                                func (*MockDescribeConfigsResponseWithErrorCode) For

                                                                                                                                                                                                                                                                                                                                                func (mr *MockDescribeConfigsResponseWithErrorCode) For(reqBody versionedDecoder) encoderWithHeader

                                                                                                                                                                                                                                                                                                                                                type MockDescribeGroupsResponse

                                                                                                                                                                                                                                                                                                                                                type MockDescribeGroupsResponse struct {
                                                                                                                                                                                                                                                                                                                                                	// contains filtered or unexported fields
                                                                                                                                                                                                                                                                                                                                                }

                                                                                                                                                                                                                                                                                                                                                func NewMockDescribeGroupsResponse

                                                                                                                                                                                                                                                                                                                                                func NewMockDescribeGroupsResponse(t TestReporter) *MockDescribeGroupsResponse

                                                                                                                                                                                                                                                                                                                                                func (*MockDescribeGroupsResponse) AddGroupDescription

                                                                                                                                                                                                                                                                                                                                                func (m *MockDescribeGroupsResponse) AddGroupDescription(groupID string, description *GroupDescription) *MockDescribeGroupsResponse

                                                                                                                                                                                                                                                                                                                                                func (*MockDescribeGroupsResponse) For

                                                                                                                                                                                                                                                                                                                                                func (m *MockDescribeGroupsResponse) For(reqBody versionedDecoder) encoderWithHeader

                                                                                                                                                                                                                                                                                                                                                type MockDescribeLogDirsResponse

                                                                                                                                                                                                                                                                                                                                                type MockDescribeLogDirsResponse struct {
                                                                                                                                                                                                                                                                                                                                                	// contains filtered or unexported fields
                                                                                                                                                                                                                                                                                                                                                }

                                                                                                                                                                                                                                                                                                                                                func NewMockDescribeLogDirsResponse

                                                                                                                                                                                                                                                                                                                                                func NewMockDescribeLogDirsResponse(t TestReporter) *MockDescribeLogDirsResponse

                                                                                                                                                                                                                                                                                                                                                func (*MockDescribeLogDirsResponse) For

                                                                                                                                                                                                                                                                                                                                                func (m *MockDescribeLogDirsResponse) For(reqBody versionedDecoder) encoderWithHeader

                                                                                                                                                                                                                                                                                                                                                func (*MockDescribeLogDirsResponse) SetLogDirs

                                                                                                                                                                                                                                                                                                                                                func (m *MockDescribeLogDirsResponse) SetLogDirs(logDirPath string, topicPartitions map[string]int) *MockDescribeLogDirsResponse

                                                                                                                                                                                                                                                                                                                                                type MockFetchResponse

                                                                                                                                                                                                                                                                                                                                                type MockFetchResponse struct {
                                                                                                                                                                                                                                                                                                                                                	// contains filtered or unexported fields
                                                                                                                                                                                                                                                                                                                                                }

                                                                                                                                                                                                                                                                                                                                                  MockFetchResponse is a `FetchResponse` builder.

                                                                                                                                                                                                                                                                                                                                                  func NewMockFetchResponse

                                                                                                                                                                                                                                                                                                                                                  func NewMockFetchResponse(t TestReporter, batchSize int) *MockFetchResponse

                                                                                                                                                                                                                                                                                                                                                  func (*MockFetchResponse) For

                                                                                                                                                                                                                                                                                                                                                  func (mfr *MockFetchResponse) For(reqBody versionedDecoder) encoderWithHeader

                                                                                                                                                                                                                                                                                                                                                  func (*MockFetchResponse) SetHighWaterMark

                                                                                                                                                                                                                                                                                                                                                  func (mfr *MockFetchResponse) SetHighWaterMark(topic string, partition int32, offset int64) *MockFetchResponse

                                                                                                                                                                                                                                                                                                                                                  func (*MockFetchResponse) SetMessage

                                                                                                                                                                                                                                                                                                                                                  func (mfr *MockFetchResponse) SetMessage(topic string, partition int32, offset int64, msg Encoder) *MockFetchResponse

                                                                                                                                                                                                                                                                                                                                                  func (*MockFetchResponse) SetVersion

                                                                                                                                                                                                                                                                                                                                                  func (mfr *MockFetchResponse) SetVersion(version int16) *MockFetchResponse

                                                                                                                                                                                                                                                                                                                                                  type MockFindCoordinatorResponse

                                                                                                                                                                                                                                                                                                                                                  type MockFindCoordinatorResponse struct {
                                                                                                                                                                                                                                                                                                                                                  	// contains filtered or unexported fields
                                                                                                                                                                                                                                                                                                                                                  }

                                                                                                                                                                                                                                                                                                                                                    MockFindCoordinatorResponse is a `FindCoordinatorResponse` builder.

                                                                                                                                                                                                                                                                                                                                                    func NewMockFindCoordinatorResponse

                                                                                                                                                                                                                                                                                                                                                    func NewMockFindCoordinatorResponse(t TestReporter) *MockFindCoordinatorResponse

                                                                                                                                                                                                                                                                                                                                                    func (*MockFindCoordinatorResponse) For

                                                                                                                                                                                                                                                                                                                                                    func (mr *MockFindCoordinatorResponse) For(reqBody versionedDecoder) encoderWithHeader

                                                                                                                                                                                                                                                                                                                                                    func (*MockFindCoordinatorResponse) SetCoordinator

                                                                                                                                                                                                                                                                                                                                                    func (mr *MockFindCoordinatorResponse) SetCoordinator(coordinatorType CoordinatorType, group string, broker *MockBroker) *MockFindCoordinatorResponse

                                                                                                                                                                                                                                                                                                                                                    func (*MockFindCoordinatorResponse) SetError

                                                                                                                                                                                                                                                                                                                                                    func (mr *MockFindCoordinatorResponse) SetError(coordinatorType CoordinatorType, group string, kerror KError) *MockFindCoordinatorResponse

                                                                                                                                                                                                                                                                                                                                                    type MockHeartbeatResponse

                                                                                                                                                                                                                                                                                                                                                    type MockHeartbeatResponse struct {
                                                                                                                                                                                                                                                                                                                                                    	Err KError
                                                                                                                                                                                                                                                                                                                                                    	// contains filtered or unexported fields
                                                                                                                                                                                                                                                                                                                                                    }

                                                                                                                                                                                                                                                                                                                                                    func NewMockHeartbeatResponse

                                                                                                                                                                                                                                                                                                                                                    func NewMockHeartbeatResponse(t TestReporter) *MockHeartbeatResponse

                                                                                                                                                                                                                                                                                                                                                    func (*MockHeartbeatResponse) For

                                                                                                                                                                                                                                                                                                                                                    func (m *MockHeartbeatResponse) For(reqBody versionedDecoder) encoderWithHeader

                                                                                                                                                                                                                                                                                                                                                    func (*MockHeartbeatResponse) SetError

                                                                                                                                                                                                                                                                                                                                                    type MockJoinGroupResponse

                                                                                                                                                                                                                                                                                                                                                    type MockJoinGroupResponse struct {
                                                                                                                                                                                                                                                                                                                                                    	ThrottleTime  int32
                                                                                                                                                                                                                                                                                                                                                    	Err           KError
                                                                                                                                                                                                                                                                                                                                                    	GenerationId  int32
                                                                                                                                                                                                                                                                                                                                                    	GroupProtocol string
                                                                                                                                                                                                                                                                                                                                                    	LeaderId      string
                                                                                                                                                                                                                                                                                                                                                    	MemberId      string
                                                                                                                                                                                                                                                                                                                                                    	Members       map[string][]byte
                                                                                                                                                                                                                                                                                                                                                    	// contains filtered or unexported fields
                                                                                                                                                                                                                                                                                                                                                    }

                                                                                                                                                                                                                                                                                                                                                    func NewMockJoinGroupResponse

                                                                                                                                                                                                                                                                                                                                                    func NewMockJoinGroupResponse(t TestReporter) *MockJoinGroupResponse

                                                                                                                                                                                                                                                                                                                                                    func (*MockJoinGroupResponse) For

                                                                                                                                                                                                                                                                                                                                                    func (m *MockJoinGroupResponse) For(reqBody versionedDecoder) encoderWithHeader

                                                                                                                                                                                                                                                                                                                                                    func (*MockJoinGroupResponse) SetError

                                                                                                                                                                                                                                                                                                                                                    func (*MockJoinGroupResponse) SetGenerationId

                                                                                                                                                                                                                                                                                                                                                    func (m *MockJoinGroupResponse) SetGenerationId(id int32) *MockJoinGroupResponse

                                                                                                                                                                                                                                                                                                                                                    func (*MockJoinGroupResponse) SetGroupProtocol

                                                                                                                                                                                                                                                                                                                                                    func (m *MockJoinGroupResponse) SetGroupProtocol(proto string) *MockJoinGroupResponse

                                                                                                                                                                                                                                                                                                                                                    func (*MockJoinGroupResponse) SetLeaderId

                                                                                                                                                                                                                                                                                                                                                    func (*MockJoinGroupResponse) SetMember

                                                                                                                                                                                                                                                                                                                                                    func (*MockJoinGroupResponse) SetMemberId

                                                                                                                                                                                                                                                                                                                                                    func (*MockJoinGroupResponse) SetThrottleTime

                                                                                                                                                                                                                                                                                                                                                    func (m *MockJoinGroupResponse) SetThrottleTime(t int32) *MockJoinGroupResponse

                                                                                                                                                                                                                                                                                                                                                    type MockKerberosClient

                                                                                                                                                                                                                                                                                                                                                    type MockKerberosClient struct {
                                                                                                                                                                                                                                                                                                                                                    	ASRep messages.ASRep
                                                                                                                                                                                                                                                                                                                                                    	// contains filtered or unexported fields
                                                                                                                                                                                                                                                                                                                                                    }

                                                                                                                                                                                                                                                                                                                                                    func (*MockKerberosClient) CName

                                                                                                                                                                                                                                                                                                                                                    func (*MockKerberosClient) Destroy

                                                                                                                                                                                                                                                                                                                                                    func (c *MockKerberosClient) Destroy()

                                                                                                                                                                                                                                                                                                                                                    func (*MockKerberosClient) Domain

                                                                                                                                                                                                                                                                                                                                                    func (c *MockKerberosClient) Domain() string

                                                                                                                                                                                                                                                                                                                                                    func (*MockKerberosClient) GetServiceTicket

                                                                                                                                                                                                                                                                                                                                                    func (c *MockKerberosClient) GetServiceTicket(spn string) (messages.Ticket, types.EncryptionKey, error)

                                                                                                                                                                                                                                                                                                                                                    func (*MockKerberosClient) Login

                                                                                                                                                                                                                                                                                                                                                    func (c *MockKerberosClient) Login() error

                                                                                                                                                                                                                                                                                                                                                    type MockLeaveGroupResponse

                                                                                                                                                                                                                                                                                                                                                    type MockLeaveGroupResponse struct {
                                                                                                                                                                                                                                                                                                                                                    	Err KError
                                                                                                                                                                                                                                                                                                                                                    	// contains filtered or unexported fields
                                                                                                                                                                                                                                                                                                                                                    }

                                                                                                                                                                                                                                                                                                                                                    func NewMockLeaveGroupResponse

                                                                                                                                                                                                                                                                                                                                                    func NewMockLeaveGroupResponse(t TestReporter) *MockLeaveGroupResponse

                                                                                                                                                                                                                                                                                                                                                    func (*MockLeaveGroupResponse) For

                                                                                                                                                                                                                                                                                                                                                    func (m *MockLeaveGroupResponse) For(reqBody versionedDecoder) encoderWithHeader

                                                                                                                                                                                                                                                                                                                                                    func (*MockLeaveGroupResponse) SetError

                                                                                                                                                                                                                                                                                                                                                    type MockListAclsResponse

                                                                                                                                                                                                                                                                                                                                                    type MockListAclsResponse struct {
                                                                                                                                                                                                                                                                                                                                                    	// contains filtered or unexported fields
                                                                                                                                                                                                                                                                                                                                                    }

                                                                                                                                                                                                                                                                                                                                                    func NewMockListAclsResponse

                                                                                                                                                                                                                                                                                                                                                    func NewMockListAclsResponse(t TestReporter) *MockListAclsResponse

                                                                                                                                                                                                                                                                                                                                                    func (*MockListAclsResponse) For

                                                                                                                                                                                                                                                                                                                                                    func (mr *MockListAclsResponse) For(reqBody versionedDecoder) encoderWithHeader

                                                                                                                                                                                                                                                                                                                                                    type MockListGroupsResponse

                                                                                                                                                                                                                                                                                                                                                    type MockListGroupsResponse struct {
                                                                                                                                                                                                                                                                                                                                                    	// contains filtered or unexported fields
                                                                                                                                                                                                                                                                                                                                                    }

                                                                                                                                                                                                                                                                                                                                                    func NewMockListGroupsResponse

                                                                                                                                                                                                                                                                                                                                                    func NewMockListGroupsResponse(t TestReporter) *MockListGroupsResponse

                                                                                                                                                                                                                                                                                                                                                    func (*MockListGroupsResponse) AddGroup

                                                                                                                                                                                                                                                                                                                                                    func (m *MockListGroupsResponse) AddGroup(groupID, protocolType string) *MockListGroupsResponse

                                                                                                                                                                                                                                                                                                                                                    func (*MockListGroupsResponse) For

                                                                                                                                                                                                                                                                                                                                                    func (m *MockListGroupsResponse) For(reqBody versionedDecoder) encoderWithHeader

                                                                                                                                                                                                                                                                                                                                                    type MockListPartitionReassignmentsResponse

                                                                                                                                                                                                                                                                                                                                                    type MockListPartitionReassignmentsResponse struct {
                                                                                                                                                                                                                                                                                                                                                    	// contains filtered or unexported fields
                                                                                                                                                                                                                                                                                                                                                    }

                                                                                                                                                                                                                                                                                                                                                    func (*MockListPartitionReassignmentsResponse) For

                                                                                                                                                                                                                                                                                                                                                    func (mr *MockListPartitionReassignmentsResponse) For(reqBody versionedDecoder) encoderWithHeader

                                                                                                                                                                                                                                                                                                                                                    type MockMetadataResponse

                                                                                                                                                                                                                                                                                                                                                    type MockMetadataResponse struct {
                                                                                                                                                                                                                                                                                                                                                    	// contains filtered or unexported fields
                                                                                                                                                                                                                                                                                                                                                    }

                                                                                                                                                                                                                                                                                                                                                      MockMetadataResponse is a `MetadataResponse` builder.

                                                                                                                                                                                                                                                                                                                                                      func NewMockMetadataResponse

                                                                                                                                                                                                                                                                                                                                                      func NewMockMetadataResponse(t TestReporter) *MockMetadataResponse

                                                                                                                                                                                                                                                                                                                                                      func (*MockMetadataResponse) For

                                                                                                                                                                                                                                                                                                                                                      func (mmr *MockMetadataResponse) For(reqBody versionedDecoder) encoderWithHeader

                                                                                                                                                                                                                                                                                                                                                      func (*MockMetadataResponse) SetBroker

                                                                                                                                                                                                                                                                                                                                                      func (mmr *MockMetadataResponse) SetBroker(addr string, brokerID int32) *MockMetadataResponse

                                                                                                                                                                                                                                                                                                                                                      func (*MockMetadataResponse) SetController

                                                                                                                                                                                                                                                                                                                                                      func (mmr *MockMetadataResponse) SetController(brokerID int32) *MockMetadataResponse

                                                                                                                                                                                                                                                                                                                                                      func (*MockMetadataResponse) SetLeader

                                                                                                                                                                                                                                                                                                                                                      func (mmr *MockMetadataResponse) SetLeader(topic string, partition, brokerID int32) *MockMetadataResponse

                                                                                                                                                                                                                                                                                                                                                      type MockOffsetCommitResponse

                                                                                                                                                                                                                                                                                                                                                      type MockOffsetCommitResponse struct {
                                                                                                                                                                                                                                                                                                                                                      	// contains filtered or unexported fields
                                                                                                                                                                                                                                                                                                                                                      }

                                                                                                                                                                                                                                                                                                                                                        MockOffsetCommitResponse is a `OffsetCommitResponse` builder.

                                                                                                                                                                                                                                                                                                                                                        func NewMockOffsetCommitResponse

                                                                                                                                                                                                                                                                                                                                                        func NewMockOffsetCommitResponse(t TestReporter) *MockOffsetCommitResponse

                                                                                                                                                                                                                                                                                                                                                        func (*MockOffsetCommitResponse) For

                                                                                                                                                                                                                                                                                                                                                        func (mr *MockOffsetCommitResponse) For(reqBody versionedDecoder) encoderWithHeader

                                                                                                                                                                                                                                                                                                                                                        func (*MockOffsetCommitResponse) SetError

                                                                                                                                                                                                                                                                                                                                                        func (mr *MockOffsetCommitResponse) SetError(group, topic string, partition int32, kerror KError) *MockOffsetCommitResponse

                                                                                                                                                                                                                                                                                                                                                        type MockOffsetFetchResponse

                                                                                                                                                                                                                                                                                                                                                        type MockOffsetFetchResponse struct {
                                                                                                                                                                                                                                                                                                                                                        	// contains filtered or unexported fields
                                                                                                                                                                                                                                                                                                                                                        }

                                                                                                                                                                                                                                                                                                                                                          MockOffsetFetchResponse is a `OffsetFetchResponse` builder.

                                                                                                                                                                                                                                                                                                                                                          func NewMockOffsetFetchResponse

                                                                                                                                                                                                                                                                                                                                                          func NewMockOffsetFetchResponse(t TestReporter) *MockOffsetFetchResponse

                                                                                                                                                                                                                                                                                                                                                          func (*MockOffsetFetchResponse) For

                                                                                                                                                                                                                                                                                                                                                          func (mr *MockOffsetFetchResponse) For(reqBody versionedDecoder) encoderWithHeader

                                                                                                                                                                                                                                                                                                                                                          func (*MockOffsetFetchResponse) SetError

                                                                                                                                                                                                                                                                                                                                                          func (*MockOffsetFetchResponse) SetOffset

                                                                                                                                                                                                                                                                                                                                                          func (mr *MockOffsetFetchResponse) SetOffset(group, topic string, partition int32, offset int64, metadata string, kerror KError) *MockOffsetFetchResponse

                                                                                                                                                                                                                                                                                                                                                          type MockOffsetResponse

                                                                                                                                                                                                                                                                                                                                                          type MockOffsetResponse struct {
                                                                                                                                                                                                                                                                                                                                                          	// contains filtered or unexported fields
                                                                                                                                                                                                                                                                                                                                                          }

                                                                                                                                                                                                                                                                                                                                                            MockOffsetResponse is an `OffsetResponse` builder.

                                                                                                                                                                                                                                                                                                                                                            func NewMockOffsetResponse

                                                                                                                                                                                                                                                                                                                                                            func NewMockOffsetResponse(t TestReporter) *MockOffsetResponse

                                                                                                                                                                                                                                                                                                                                                            func (*MockOffsetResponse) For

                                                                                                                                                                                                                                                                                                                                                            func (mor *MockOffsetResponse) For(reqBody versionedDecoder) encoderWithHeader

                                                                                                                                                                                                                                                                                                                                                            func (*MockOffsetResponse) SetOffset

                                                                                                                                                                                                                                                                                                                                                            func (mor *MockOffsetResponse) SetOffset(topic string, partition int32, time, offset int64) *MockOffsetResponse

                                                                                                                                                                                                                                                                                                                                                            func (*MockOffsetResponse) SetVersion

                                                                                                                                                                                                                                                                                                                                                            func (mor *MockOffsetResponse) SetVersion(version int16) *MockOffsetResponse

                                                                                                                                                                                                                                                                                                                                                            type MockProduceResponse

                                                                                                                                                                                                                                                                                                                                                            type MockProduceResponse struct {
                                                                                                                                                                                                                                                                                                                                                            	// contains filtered or unexported fields
                                                                                                                                                                                                                                                                                                                                                            }

                                                                                                                                                                                                                                                                                                                                                              MockProduceResponse is a `ProduceResponse` builder.

                                                                                                                                                                                                                                                                                                                                                              func NewMockProduceResponse

                                                                                                                                                                                                                                                                                                                                                              func NewMockProduceResponse(t TestReporter) *MockProduceResponse

                                                                                                                                                                                                                                                                                                                                                              func (*MockProduceResponse) For

                                                                                                                                                                                                                                                                                                                                                              func (mr *MockProduceResponse) For(reqBody versionedDecoder) encoderWithHeader

                                                                                                                                                                                                                                                                                                                                                              func (*MockProduceResponse) SetError

                                                                                                                                                                                                                                                                                                                                                              func (mr *MockProduceResponse) SetError(topic string, partition int32, kerror KError) *MockProduceResponse

                                                                                                                                                                                                                                                                                                                                                              func (*MockProduceResponse) SetVersion

                                                                                                                                                                                                                                                                                                                                                              func (mr *MockProduceResponse) SetVersion(version int16) *MockProduceResponse

                                                                                                                                                                                                                                                                                                                                                              type MockResponse

                                                                                                                                                                                                                                                                                                                                                              type MockResponse interface {
                                                                                                                                                                                                                                                                                                                                                              	For(reqBody versionedDecoder) (res encoderWithHeader)
                                                                                                                                                                                                                                                                                                                                                              }

                                                                                                                                                                                                                                                                                                                                                                MockResponse is a response builder interface it defines one method that allows generating a response based on a request body. MockResponses are used to program behavior of MockBroker in tests.

                                                                                                                                                                                                                                                                                                                                                                type MockSaslAuthenticateResponse

                                                                                                                                                                                                                                                                                                                                                                type MockSaslAuthenticateResponse struct {
                                                                                                                                                                                                                                                                                                                                                                	// contains filtered or unexported fields
                                                                                                                                                                                                                                                                                                                                                                }

                                                                                                                                                                                                                                                                                                                                                                func NewMockSaslAuthenticateResponse

                                                                                                                                                                                                                                                                                                                                                                func NewMockSaslAuthenticateResponse(t TestReporter) *MockSaslAuthenticateResponse

                                                                                                                                                                                                                                                                                                                                                                func (*MockSaslAuthenticateResponse) For

                                                                                                                                                                                                                                                                                                                                                                func (msar *MockSaslAuthenticateResponse) For(reqBody versionedDecoder) encoderWithHeader

                                                                                                                                                                                                                                                                                                                                                                func (*MockSaslAuthenticateResponse) SetAuthBytes

                                                                                                                                                                                                                                                                                                                                                                func (msar *MockSaslAuthenticateResponse) SetAuthBytes(saslAuthBytes []byte) *MockSaslAuthenticateResponse

                                                                                                                                                                                                                                                                                                                                                                func (*MockSaslAuthenticateResponse) SetError

                                                                                                                                                                                                                                                                                                                                                                type MockSaslHandshakeResponse

                                                                                                                                                                                                                                                                                                                                                                type MockSaslHandshakeResponse struct {
                                                                                                                                                                                                                                                                                                                                                                	// contains filtered or unexported fields
                                                                                                                                                                                                                                                                                                                                                                }

                                                                                                                                                                                                                                                                                                                                                                func NewMockSaslHandshakeResponse

                                                                                                                                                                                                                                                                                                                                                                func NewMockSaslHandshakeResponse(t TestReporter) *MockSaslHandshakeResponse

                                                                                                                                                                                                                                                                                                                                                                func (*MockSaslHandshakeResponse) For

                                                                                                                                                                                                                                                                                                                                                                func (mshr *MockSaslHandshakeResponse) For(reqBody versionedDecoder) encoderWithHeader

                                                                                                                                                                                                                                                                                                                                                                func (*MockSaslHandshakeResponse) SetEnabledMechanisms

                                                                                                                                                                                                                                                                                                                                                                func (mshr *MockSaslHandshakeResponse) SetEnabledMechanisms(enabledMechanisms []string) *MockSaslHandshakeResponse

                                                                                                                                                                                                                                                                                                                                                                func (*MockSaslHandshakeResponse) SetError

                                                                                                                                                                                                                                                                                                                                                                type MockSequence

                                                                                                                                                                                                                                                                                                                                                                type MockSequence struct {
                                                                                                                                                                                                                                                                                                                                                                	// contains filtered or unexported fields
                                                                                                                                                                                                                                                                                                                                                                }

                                                                                                                                                                                                                                                                                                                                                                  MockSequence is a mock response builder that is created from a sequence of concrete responses. Every time when a `MockBroker` calls its `For` method the next response from the sequence is returned. When the end of the sequence is reached the last element from the sequence is returned.

                                                                                                                                                                                                                                                                                                                                                                  func NewMockSequence

                                                                                                                                                                                                                                                                                                                                                                  func NewMockSequence(responses ...interface{}) *MockSequence

                                                                                                                                                                                                                                                                                                                                                                  func (*MockSequence) For

                                                                                                                                                                                                                                                                                                                                                                  func (mc *MockSequence) For(reqBody versionedDecoder) (res encoderWithHeader)

                                                                                                                                                                                                                                                                                                                                                                  type MockSyncGroupResponse

                                                                                                                                                                                                                                                                                                                                                                  type MockSyncGroupResponse struct {
                                                                                                                                                                                                                                                                                                                                                                  	Err              KError
                                                                                                                                                                                                                                                                                                                                                                  	MemberAssignment []byte
                                                                                                                                                                                                                                                                                                                                                                  	// contains filtered or unexported fields
                                                                                                                                                                                                                                                                                                                                                                  }

                                                                                                                                                                                                                                                                                                                                                                  func NewMockSyncGroupResponse

                                                                                                                                                                                                                                                                                                                                                                  func NewMockSyncGroupResponse(t TestReporter) *MockSyncGroupResponse

                                                                                                                                                                                                                                                                                                                                                                  func (*MockSyncGroupResponse) For

                                                                                                                                                                                                                                                                                                                                                                  func (m *MockSyncGroupResponse) For(reqBody versionedDecoder) encoderWithHeader

                                                                                                                                                                                                                                                                                                                                                                  func (*MockSyncGroupResponse) SetError

                                                                                                                                                                                                                                                                                                                                                                  func (*MockSyncGroupResponse) SetMemberAssignment

                                                                                                                                                                                                                                                                                                                                                                  func (m *MockSyncGroupResponse) SetMemberAssignment(assignment *ConsumerGroupMemberAssignment) *MockSyncGroupResponse

                                                                                                                                                                                                                                                                                                                                                                  type MockWrapper

                                                                                                                                                                                                                                                                                                                                                                  type MockWrapper struct {
                                                                                                                                                                                                                                                                                                                                                                  	// contains filtered or unexported fields
                                                                                                                                                                                                                                                                                                                                                                  }

                                                                                                                                                                                                                                                                                                                                                                    MockWrapper is a mock response builder that returns a particular concrete response regardless of the actual request passed to the `For` method.

                                                                                                                                                                                                                                                                                                                                                                    func NewMockWrapper

                                                                                                                                                                                                                                                                                                                                                                    func NewMockWrapper(res encoderWithHeader) *MockWrapper

                                                                                                                                                                                                                                                                                                                                                                    func (*MockWrapper) For

                                                                                                                                                                                                                                                                                                                                                                    func (mw *MockWrapper) For(reqBody versionedDecoder) (res encoderWithHeader)

                                                                                                                                                                                                                                                                                                                                                                    type MultiError

                                                                                                                                                                                                                                                                                                                                                                    type MultiError struct {
                                                                                                                                                                                                                                                                                                                                                                    	Errors *[]error
                                                                                                                                                                                                                                                                                                                                                                    }

                                                                                                                                                                                                                                                                                                                                                                      MultiError is used to contain multi error.

                                                                                                                                                                                                                                                                                                                                                                      func (MultiError) Error

                                                                                                                                                                                                                                                                                                                                                                      func (mErr MultiError) Error() string

                                                                                                                                                                                                                                                                                                                                                                      func (MultiError) PrettyError

                                                                                                                                                                                                                                                                                                                                                                      func (mErr MultiError) PrettyError() string

                                                                                                                                                                                                                                                                                                                                                                      type OffsetCommitRequest

                                                                                                                                                                                                                                                                                                                                                                      type OffsetCommitRequest struct {
                                                                                                                                                                                                                                                                                                                                                                      	ConsumerGroup           string
                                                                                                                                                                                                                                                                                                                                                                      	ConsumerGroupGeneration int32  // v1 or later
                                                                                                                                                                                                                                                                                                                                                                      	ConsumerID              string // v1 or later
                                                                                                                                                                                                                                                                                                                                                                      	RetentionTime           int64  // v2 or later
                                                                                                                                                                                                                                                                                                                                                                      
                                                                                                                                                                                                                                                                                                                                                                      	// Version can be:
                                                                                                                                                                                                                                                                                                                                                                      	// - 0 (kafka 0.8.1 and later)
                                                                                                                                                                                                                                                                                                                                                                      	// - 1 (kafka 0.8.2 and later)
                                                                                                                                                                                                                                                                                                                                                                      	// - 2 (kafka 0.9.0 and later)
                                                                                                                                                                                                                                                                                                                                                                      	// - 3 (kafka 0.11.0 and later)
                                                                                                                                                                                                                                                                                                                                                                      	// - 4 (kafka 2.0.0 and later)
                                                                                                                                                                                                                                                                                                                                                                      	Version int16
                                                                                                                                                                                                                                                                                                                                                                      	// contains filtered or unexported fields
                                                                                                                                                                                                                                                                                                                                                                      }

                                                                                                                                                                                                                                                                                                                                                                      func (*OffsetCommitRequest) AddBlock

                                                                                                                                                                                                                                                                                                                                                                      func (r *OffsetCommitRequest) AddBlock(topic string, partitionID int32, offset int64, timestamp int64, metadata string)

                                                                                                                                                                                                                                                                                                                                                                      func (*OffsetCommitRequest) Offset

                                                                                                                                                                                                                                                                                                                                                                      func (r *OffsetCommitRequest) Offset(topic string, partitionID int32) (int64, string, error)

                                                                                                                                                                                                                                                                                                                                                                      type OffsetCommitResponse

                                                                                                                                                                                                                                                                                                                                                                      type OffsetCommitResponse struct {
                                                                                                                                                                                                                                                                                                                                                                      	Version        int16
                                                                                                                                                                                                                                                                                                                                                                      	ThrottleTimeMs int32
                                                                                                                                                                                                                                                                                                                                                                      	Errors         map[string]map[int32]KError
                                                                                                                                                                                                                                                                                                                                                                      }

                                                                                                                                                                                                                                                                                                                                                                      func (*OffsetCommitResponse) AddError

                                                                                                                                                                                                                                                                                                                                                                      func (r *OffsetCommitResponse) AddError(topic string, partition int32, kerror KError)

                                                                                                                                                                                                                                                                                                                                                                      type OffsetFetchRequest

                                                                                                                                                                                                                                                                                                                                                                      type OffsetFetchRequest struct {
                                                                                                                                                                                                                                                                                                                                                                      	Version       int16
                                                                                                                                                                                                                                                                                                                                                                      	ConsumerGroup string
                                                                                                                                                                                                                                                                                                                                                                      	// contains filtered or unexported fields
                                                                                                                                                                                                                                                                                                                                                                      }

                                                                                                                                                                                                                                                                                                                                                                      func (*OffsetFetchRequest) AddPartition

                                                                                                                                                                                                                                                                                                                                                                      func (r *OffsetFetchRequest) AddPartition(topic string, partitionID int32)

                                                                                                                                                                                                                                                                                                                                                                      func (*OffsetFetchRequest) ZeroPartitions

                                                                                                                                                                                                                                                                                                                                                                      func (r *OffsetFetchRequest) ZeroPartitions()

                                                                                                                                                                                                                                                                                                                                                                      type OffsetFetchResponse

                                                                                                                                                                                                                                                                                                                                                                      type OffsetFetchResponse struct {
                                                                                                                                                                                                                                                                                                                                                                      	Version        int16
                                                                                                                                                                                                                                                                                                                                                                      	ThrottleTimeMs int32
                                                                                                                                                                                                                                                                                                                                                                      	Blocks         map[string]map[int32]*OffsetFetchResponseBlock
                                                                                                                                                                                                                                                                                                                                                                      	Err            KError
                                                                                                                                                                                                                                                                                                                                                                      }

                                                                                                                                                                                                                                                                                                                                                                      func (*OffsetFetchResponse) AddBlock

                                                                                                                                                                                                                                                                                                                                                                      func (r *OffsetFetchResponse) AddBlock(topic string, partition int32, block *OffsetFetchResponseBlock)

                                                                                                                                                                                                                                                                                                                                                                      func (*OffsetFetchResponse) GetBlock

                                                                                                                                                                                                                                                                                                                                                                      func (r *OffsetFetchResponse) GetBlock(topic string, partition int32) *OffsetFetchResponseBlock

                                                                                                                                                                                                                                                                                                                                                                      type OffsetFetchResponseBlock

                                                                                                                                                                                                                                                                                                                                                                      type OffsetFetchResponseBlock struct {
                                                                                                                                                                                                                                                                                                                                                                      	Offset      int64
                                                                                                                                                                                                                                                                                                                                                                      	LeaderEpoch int32
                                                                                                                                                                                                                                                                                                                                                                      	Metadata    string
                                                                                                                                                                                                                                                                                                                                                                      	Err         KError
                                                                                                                                                                                                                                                                                                                                                                      }

                                                                                                                                                                                                                                                                                                                                                                      type OffsetManager

                                                                                                                                                                                                                                                                                                                                                                      type OffsetManager interface {
                                                                                                                                                                                                                                                                                                                                                                      	// ManagePartition creates a PartitionOffsetManager on the given topic/partition.
                                                                                                                                                                                                                                                                                                                                                                      	// It will return an error if this OffsetManager is already managing the given
                                                                                                                                                                                                                                                                                                                                                                      	// topic/partition.
                                                                                                                                                                                                                                                                                                                                                                      	ManagePartition(topic string, partition int32) (PartitionOffsetManager, error)
                                                                                                                                                                                                                                                                                                                                                                      
                                                                                                                                                                                                                                                                                                                                                                      	// Close stops the OffsetManager from managing offsets. It is required to call
                                                                                                                                                                                                                                                                                                                                                                      	// this function before an OffsetManager object passes out of scope, as it
                                                                                                                                                                                                                                                                                                                                                                      	// will otherwise leak memory. You must call this after all the
                                                                                                                                                                                                                                                                                                                                                                      	// PartitionOffsetManagers are closed.
                                                                                                                                                                                                                                                                                                                                                                      	Close() error
                                                                                                                                                                                                                                                                                                                                                                      
                                                                                                                                                                                                                                                                                                                                                                      	// Commit commits the offsets. This method can be used if AutoCommit.Enable is
                                                                                                                                                                                                                                                                                                                                                                      	// set to false.
                                                                                                                                                                                                                                                                                                                                                                      	Commit()
                                                                                                                                                                                                                                                                                                                                                                      }

                                                                                                                                                                                                                                                                                                                                                                        OffsetManager uses Kafka to store and fetch consumed partition offsets.

                                                                                                                                                                                                                                                                                                                                                                        func NewOffsetManagerFromClient

                                                                                                                                                                                                                                                                                                                                                                        func NewOffsetManagerFromClient(group string, client Client) (OffsetManager, error)

                                                                                                                                                                                                                                                                                                                                                                          NewOffsetManagerFromClient creates a new OffsetManager from the given client. It is still necessary to call Close() on the underlying client when finished with the partition manager.

                                                                                                                                                                                                                                                                                                                                                                          type OffsetRequest

                                                                                                                                                                                                                                                                                                                                                                          type OffsetRequest struct {
                                                                                                                                                                                                                                                                                                                                                                          	Version int16
                                                                                                                                                                                                                                                                                                                                                                          	// contains filtered or unexported fields
                                                                                                                                                                                                                                                                                                                                                                          }

                                                                                                                                                                                                                                                                                                                                                                          func (*OffsetRequest) AddBlock

                                                                                                                                                                                                                                                                                                                                                                          func (r *OffsetRequest) AddBlock(topic string, partitionID int32, time int64, maxOffsets int32)

                                                                                                                                                                                                                                                                                                                                                                          func (*OffsetRequest) ReplicaID

                                                                                                                                                                                                                                                                                                                                                                          func (r *OffsetRequest) ReplicaID() int32

                                                                                                                                                                                                                                                                                                                                                                          func (*OffsetRequest) SetReplicaID

                                                                                                                                                                                                                                                                                                                                                                          func (r *OffsetRequest) SetReplicaID(id int32)

                                                                                                                                                                                                                                                                                                                                                                          type OffsetResponse

                                                                                                                                                                                                                                                                                                                                                                          type OffsetResponse struct {
                                                                                                                                                                                                                                                                                                                                                                          	Version int16
                                                                                                                                                                                                                                                                                                                                                                          	Blocks  map[string]map[int32]*OffsetResponseBlock
                                                                                                                                                                                                                                                                                                                                                                          }

                                                                                                                                                                                                                                                                                                                                                                          func (*OffsetResponse) AddTopicPartition

                                                                                                                                                                                                                                                                                                                                                                          func (r *OffsetResponse) AddTopicPartition(topic string, partition int32, offset int64)

                                                                                                                                                                                                                                                                                                                                                                          func (*OffsetResponse) GetBlock

                                                                                                                                                                                                                                                                                                                                                                          func (r *OffsetResponse) GetBlock(topic string, partition int32) *OffsetResponseBlock

                                                                                                                                                                                                                                                                                                                                                                          type OffsetResponseBlock

                                                                                                                                                                                                                                                                                                                                                                          type OffsetResponseBlock struct {
                                                                                                                                                                                                                                                                                                                                                                          	Err       KError
                                                                                                                                                                                                                                                                                                                                                                          	Offsets   []int64 // Version 0
                                                                                                                                                                                                                                                                                                                                                                          	Offset    int64   // Version 1
                                                                                                                                                                                                                                                                                                                                                                          	Timestamp int64   // Version 1
                                                                                                                                                                                                                                                                                                                                                                          }

                                                                                                                                                                                                                                                                                                                                                                          type PacketDecodingError

                                                                                                                                                                                                                                                                                                                                                                          type PacketDecodingError struct {
                                                                                                                                                                                                                                                                                                                                                                          	Info string
                                                                                                                                                                                                                                                                                                                                                                          }

                                                                                                                                                                                                                                                                                                                                                                            PacketDecodingError is returned when there was an error (other than truncated data) decoding the Kafka broker's response. This can be a bad CRC or length field, or any other invalid value.

                                                                                                                                                                                                                                                                                                                                                                            func (PacketDecodingError) Error

                                                                                                                                                                                                                                                                                                                                                                            func (err PacketDecodingError) Error() string

                                                                                                                                                                                                                                                                                                                                                                            type PacketEncodingError

                                                                                                                                                                                                                                                                                                                                                                            type PacketEncodingError struct {
                                                                                                                                                                                                                                                                                                                                                                            	Info string
                                                                                                                                                                                                                                                                                                                                                                            }

                                                                                                                                                                                                                                                                                                                                                                              PacketEncodingError is returned from a failure while encoding a Kafka packet. This can happen, for example, if you try to encode a string over 2^15 characters in length, since Kafka's encoding rules do not permit that.

                                                                                                                                                                                                                                                                                                                                                                              func (PacketEncodingError) Error

                                                                                                                                                                                                                                                                                                                                                                              func (err PacketEncodingError) Error() string

                                                                                                                                                                                                                                                                                                                                                                              type PartitionConsumer

                                                                                                                                                                                                                                                                                                                                                                              type PartitionConsumer interface {
                                                                                                                                                                                                                                                                                                                                                                              	// AsyncClose initiates a shutdown of the PartitionConsumer. This method will return immediately, after which you
                                                                                                                                                                                                                                                                                                                                                                              	// should continue to service the 'Messages' and 'Errors' channels until they are empty. It is required to call this
                                                                                                                                                                                                                                                                                                                                                                              	// function, or Close before a consumer object passes out of scope, as it will otherwise leak memory. You must call
                                                                                                                                                                                                                                                                                                                                                                              	// this before calling Close on the underlying client.
                                                                                                                                                                                                                                                                                                                                                                              	AsyncClose()
                                                                                                                                                                                                                                                                                                                                                                              
                                                                                                                                                                                                                                                                                                                                                                              	// Close stops the PartitionConsumer from fetching messages. It will initiate a shutdown just like AsyncClose, drain
                                                                                                                                                                                                                                                                                                                                                                              	// the Messages channel, harvest any errors & return them to the caller. Note that if you are continuing to service
                                                                                                                                                                                                                                                                                                                                                                              	// the Messages channel when this function is called, you will be competing with Close for messages; consider
                                                                                                                                                                                                                                                                                                                                                                              	// calling AsyncClose, instead. It is required to call this function (or AsyncClose) before a consumer object passes
                                                                                                                                                                                                                                                                                                                                                                              	// out of scope, as it will otherwise leak memory. You must call this before calling Close on the underlying client.
                                                                                                                                                                                                                                                                                                                                                                              	Close() error
                                                                                                                                                                                                                                                                                                                                                                              
                                                                                                                                                                                                                                                                                                                                                                              	// Messages returns the read channel for the messages that are returned by
                                                                                                                                                                                                                                                                                                                                                                              	// the broker.
                                                                                                                                                                                                                                                                                                                                                                              	Messages() <-chan *ConsumerMessage
                                                                                                                                                                                                                                                                                                                                                                              
                                                                                                                                                                                                                                                                                                                                                                              	// Errors returns a read channel of errors that occurred during consuming, if
                                                                                                                                                                                                                                                                                                                                                                              	// enabled. By default, errors are logged and not returned over this channel.
                                                                                                                                                                                                                                                                                                                                                                              	// If you want to implement any custom error handling, set your config's
                                                                                                                                                                                                                                                                                                                                                                              	// Consumer.Return.Errors setting to true, and read from this channel.
                                                                                                                                                                                                                                                                                                                                                                              	Errors() <-chan *ConsumerError
                                                                                                                                                                                                                                                                                                                                                                              
                                                                                                                                                                                                                                                                                                                                                                              	// HighWaterMarkOffset returns the high water mark offset of the partition,
                                                                                                                                                                                                                                                                                                                                                                              	// i.e. the offset that will be used for the next message that will be produced.
                                                                                                                                                                                                                                                                                                                                                                              	// You can use this to determine how far behind the processing is.
                                                                                                                                                                                                                                                                                                                                                                              	HighWaterMarkOffset() int64
                                                                                                                                                                                                                                                                                                                                                                              }

                                                                                                                                                                                                                                                                                                                                                                                PartitionConsumer processes Kafka messages from a given topic and partition. You MUST call one of Close() or AsyncClose() on a PartitionConsumer to avoid leaks; it will not be garbage-collected automatically when it passes out of scope.

                                                                                                                                                                                                                                                                                                                                                                                The simplest way of using a PartitionConsumer is to loop over its Messages channel using a for/range loop. The PartitionConsumer will only stop itself in one case: when the offset being consumed is reported as out of range by the brokers. In this case you should decide what you want to do (try a different offset, notify a human, etc) and handle it appropriately. For all other error cases, it will just keep retrying. By default, it logs these errors to sarama.Logger; if you want to be notified directly of all errors, set your config's Consumer.Return.Errors to true and read from the Errors channel, using a select statement or a separate goroutine. Check out the Consumer examples to see implementations of these different approaches.

                                                                                                                                                                                                                                                                                                                                                                                To terminate such a for/range loop while the loop is executing, call AsyncClose. This will kick off the process of consumer tear-down & return immediately. Continue to loop, servicing the Messages channel until the teardown process AsyncClose initiated closes it (thus terminating the for/range loop). If you've already ceased reading Messages, call Close; this will signal the PartitionConsumer's goroutines to begin shutting down (just like AsyncClose), but will also drain the Messages channel, harvest all errors & return them once cleanup has completed.

                                                                                                                                                                                                                                                                                                                                                                                type PartitionError

                                                                                                                                                                                                                                                                                                                                                                                type PartitionError struct {
                                                                                                                                                                                                                                                                                                                                                                                	Partition int32
                                                                                                                                                                                                                                                                                                                                                                                	Err       KError
                                                                                                                                                                                                                                                                                                                                                                                }

                                                                                                                                                                                                                                                                                                                                                                                  PartitionError is a partition error type

                                                                                                                                                                                                                                                                                                                                                                                  type PartitionMetadata

                                                                                                                                                                                                                                                                                                                                                                                  type PartitionMetadata struct {
                                                                                                                                                                                                                                                                                                                                                                                  	Err             KError
                                                                                                                                                                                                                                                                                                                                                                                  	ID              int32
                                                                                                                                                                                                                                                                                                                                                                                  	Leader          int32
                                                                                                                                                                                                                                                                                                                                                                                  	Replicas        []int32
                                                                                                                                                                                                                                                                                                                                                                                  	Isr             []int32
                                                                                                                                                                                                                                                                                                                                                                                  	OfflineReplicas []int32
                                                                                                                                                                                                                                                                                                                                                                                  }

                                                                                                                                                                                                                                                                                                                                                                                  type PartitionOffsetManager

                                                                                                                                                                                                                                                                                                                                                                                  type PartitionOffsetManager interface {
                                                                                                                                                                                                                                                                                                                                                                                  	// NextOffset returns the next offset that should be consumed for the managed
                                                                                                                                                                                                                                                                                                                                                                                  	// partition, accompanied by metadata which can be used to reconstruct the state
                                                                                                                                                                                                                                                                                                                                                                                  	// of the partition consumer when it resumes. NextOffset() will return
                                                                                                                                                                                                                                                                                                                                                                                  	// `config.Consumer.Offsets.Initial` and an empty metadata string if no offset
                                                                                                                                                                                                                                                                                                                                                                                  	// was committed for this partition yet.
                                                                                                                                                                                                                                                                                                                                                                                  	NextOffset() (int64, string)
                                                                                                                                                                                                                                                                                                                                                                                  
                                                                                                                                                                                                                                                                                                                                                                                  	// MarkOffset marks the provided offset, alongside a metadata string
                                                                                                                                                                                                                                                                                                                                                                                  	// that represents the state of the partition consumer at that point in time. The
                                                                                                                                                                                                                                                                                                                                                                                  	// metadata string can be used by another consumer to restore that state, so it
                                                                                                                                                                                                                                                                                                                                                                                  	// can resume consumption.
                                                                                                                                                                                                                                                                                                                                                                                  	//
                                                                                                                                                                                                                                                                                                                                                                                  	// To follow upstream conventions, you are expected to mark the offset of the
                                                                                                                                                                                                                                                                                                                                                                                  	// next message to read, not the last message read. Thus, when calling `MarkOffset`
                                                                                                                                                                                                                                                                                                                                                                                  	// you should typically add one to the offset of the last consumed message.
                                                                                                                                                                                                                                                                                                                                                                                  	//
                                                                                                                                                                                                                                                                                                                                                                                  	// Note: calling MarkOffset does not necessarily commit the offset to the backend
                                                                                                                                                                                                                                                                                                                                                                                  	// store immediately for efficiency reasons, and it may never be committed if
                                                                                                                                                                                                                                                                                                                                                                                  	// your application crashes. This means that you may end up processing the same
                                                                                                                                                                                                                                                                                                                                                                                  	// message twice, and your processing should ideally be idempotent.
                                                                                                                                                                                                                                                                                                                                                                                  	MarkOffset(offset int64, metadata string)
                                                                                                                                                                                                                                                                                                                                                                                  
                                                                                                                                                                                                                                                                                                                                                                                  	// ResetOffset resets to the provided offset, alongside a metadata string that
                                                                                                                                                                                                                                                                                                                                                                                  	// represents the state of the partition consumer at that point in time. Reset
                                                                                                                                                                                                                                                                                                                                                                                  	// acts as a counterpart to MarkOffset, the difference being that it allows to
                                                                                                                                                                                                                                                                                                                                                                                  	// reset an offset to an earlier or smaller value, where MarkOffset only
                                                                                                                                                                                                                                                                                                                                                                                  	// allows incrementing the offset. cf MarkOffset for more details.
                                                                                                                                                                                                                                                                                                                                                                                  	ResetOffset(offset int64, metadata string)
                                                                                                                                                                                                                                                                                                                                                                                  
                                                                                                                                                                                                                                                                                                                                                                                  	// Errors returns a read channel of errors that occur during offset management, if
                                                                                                                                                                                                                                                                                                                                                                                  	// enabled. By default, errors are logged and not returned over this channel. If
                                                                                                                                                                                                                                                                                                                                                                                  	// you want to implement any custom error handling, set your config's
                                                                                                                                                                                                                                                                                                                                                                                  	// Consumer.Return.Errors setting to true, and read from this channel.
                                                                                                                                                                                                                                                                                                                                                                                  	Errors() <-chan *ConsumerError
                                                                                                                                                                                                                                                                                                                                                                                  
                                                                                                                                                                                                                                                                                                                                                                                  	// AsyncClose initiates a shutdown of the PartitionOffsetManager. This method will
                                                                                                                                                                                                                                                                                                                                                                                  	// return immediately, after which you should wait until the 'errors' channel has
                                                                                                                                                                                                                                                                                                                                                                                  	// been drained and closed. It is required to call this function, or Close before
                                                                                                                                                                                                                                                                                                                                                                                  	// a consumer object passes out of scope, as it will otherwise leak memory. You
                                                                                                                                                                                                                                                                                                                                                                                  	// must call this before calling Close on the underlying client.
                                                                                                                                                                                                                                                                                                                                                                                  	AsyncClose()
                                                                                                                                                                                                                                                                                                                                                                                  
                                                                                                                                                                                                                                                                                                                                                                                  	// Close stops the PartitionOffsetManager from managing offsets. It is required to
                                                                                                                                                                                                                                                                                                                                                                                  	// call this function (or AsyncClose) before a PartitionOffsetManager object
                                                                                                                                                                                                                                                                                                                                                                                  	// passes out of scope, as it will otherwise leak memory. You must call this
                                                                                                                                                                                                                                                                                                                                                                                  	// before calling Close on the underlying client.
                                                                                                                                                                                                                                                                                                                                                                                  	Close() error
                                                                                                                                                                                                                                                                                                                                                                                  }

                                                                                                                                                                                                                                                                                                                                                                                    PartitionOffsetManager uses Kafka to store and fetch consumed partition offsets. You MUST call Close() on a partition offset manager to avoid leaks, it will not be garbage-collected automatically when it passes out of scope.

                                                                                                                                                                                                                                                                                                                                                                                    type PartitionOffsetMetadata

                                                                                                                                                                                                                                                                                                                                                                                    type PartitionOffsetMetadata struct {
                                                                                                                                                                                                                                                                                                                                                                                    	Partition int32
                                                                                                                                                                                                                                                                                                                                                                                    	Offset    int64
                                                                                                                                                                                                                                                                                                                                                                                    	Metadata  *string
                                                                                                                                                                                                                                                                                                                                                                                    }

                                                                                                                                                                                                                                                                                                                                                                                    type PartitionReplicaReassignmentsStatus

                                                                                                                                                                                                                                                                                                                                                                                    type PartitionReplicaReassignmentsStatus struct {
                                                                                                                                                                                                                                                                                                                                                                                    	Replicas         []int32
                                                                                                                                                                                                                                                                                                                                                                                    	AddingReplicas   []int32
                                                                                                                                                                                                                                                                                                                                                                                    	RemovingReplicas []int32
                                                                                                                                                                                                                                                                                                                                                                                    }

                                                                                                                                                                                                                                                                                                                                                                                    type Partitioner

                                                                                                                                                                                                                                                                                                                                                                                    type Partitioner interface {
                                                                                                                                                                                                                                                                                                                                                                                    	// Partition takes a message and partition count and chooses a partition
                                                                                                                                                                                                                                                                                                                                                                                    	Partition(message *ProducerMessage, numPartitions int32) (int32, error)
                                                                                                                                                                                                                                                                                                                                                                                    
                                                                                                                                                                                                                                                                                                                                                                                    	// RequiresConsistency indicates to the user of the partitioner whether the
                                                                                                                                                                                                                                                                                                                                                                                    	// mapping of key->partition is consistent or not. Specifically, if a
                                                                                                                                                                                                                                                                                                                                                                                    	// partitioner requires consistency then it must be allowed to choose from all
                                                                                                                                                                                                                                                                                                                                                                                    	// partitions (even ones known to be unavailable), and its choice must be
                                                                                                                                                                                                                                                                                                                                                                                    	// respected by the caller. The obvious example is the HashPartitioner.
                                                                                                                                                                                                                                                                                                                                                                                    	RequiresConsistency() bool
                                                                                                                                                                                                                                                                                                                                                                                    }

                                                                                                                                                                                                                                                                                                                                                                                      Partitioner is anything that, given a Kafka message and a number of partitions indexed [0...numPartitions-1], decides to which partition to send the message. RandomPartitioner, RoundRobinPartitioner and HashPartitioner are provided as simple default implementations.

                                                                                                                                                                                                                                                                                                                                                                                      Example (Manual)

                                                                                                                                                                                                                                                                                                                                                                                        This example shows how to assign partitions to your messages manually.

                                                                                                                                                                                                                                                                                                                                                                                        Output:
                                                                                                                                                                                                                                                                                                                                                                                        
                                                                                                                                                                                                                                                                                                                                                                                        
                                                                                                                                                                                                                                                                                                                                                                                        Example (Per_topic)

                                                                                                                                                                                                                                                                                                                                                                                          This example shows how to set a different partitioner depending on the topic.

                                                                                                                                                                                                                                                                                                                                                                                          Output:
                                                                                                                                                                                                                                                                                                                                                                                          
                                                                                                                                                                                                                                                                                                                                                                                          
                                                                                                                                                                                                                                                                                                                                                                                          Example (Random)

                                                                                                                                                                                                                                                                                                                                                                                            By default, Sarama uses the message's key to consistently assign a partition to a message using hashing. If no key is set, a random partition will be chosen. This example shows how you can partition messages randomly, even when a key is set, by overriding Config.Producer.Partitioner.

                                                                                                                                                                                                                                                                                                                                                                                            Output:
                                                                                                                                                                                                                                                                                                                                                                                            
                                                                                                                                                                                                                                                                                                                                                                                            

                                                                                                                                                                                                                                                                                                                                                                                            func NewHashPartitioner

                                                                                                                                                                                                                                                                                                                                                                                            func NewHashPartitioner(topic string) Partitioner

                                                                                                                                                                                                                                                                                                                                                                                              NewHashPartitioner returns a Partitioner which behaves as follows. If the message's key is nil then a random partition is chosen. Otherwise the FNV-1a hash of the encoded bytes of the message key is used, modulus the number of partitions. This ensures that messages with the same key always end up on the same partition.

                                                                                                                                                                                                                                                                                                                                                                                              func NewManualPartitioner

                                                                                                                                                                                                                                                                                                                                                                                              func NewManualPartitioner(topic string) Partitioner

                                                                                                                                                                                                                                                                                                                                                                                                NewManualPartitioner returns a Partitioner which uses the partition manually set in the provided ProducerMessage's Partition field as the partition to produce to.

                                                                                                                                                                                                                                                                                                                                                                                                func NewRandomPartitioner

                                                                                                                                                                                                                                                                                                                                                                                                func NewRandomPartitioner(topic string) Partitioner

                                                                                                                                                                                                                                                                                                                                                                                                  NewRandomPartitioner returns a Partitioner which chooses a random partition each time.

                                                                                                                                                                                                                                                                                                                                                                                                  func NewReferenceHashPartitioner

                                                                                                                                                                                                                                                                                                                                                                                                  func NewReferenceHashPartitioner(topic string) Partitioner

                                                                                                                                                                                                                                                                                                                                                                                                    NewReferenceHashPartitioner is like NewHashPartitioner except that it handles absolute values in the same way as the reference Java implementation. NewHashPartitioner was supposed to do that but it had a mistake and now there are people depending on both behaviours. This will all go away on the next major version bump.

                                                                                                                                                                                                                                                                                                                                                                                                    func NewRoundRobinPartitioner

                                                                                                                                                                                                                                                                                                                                                                                                    func NewRoundRobinPartitioner(topic string) Partitioner

                                                                                                                                                                                                                                                                                                                                                                                                      NewRoundRobinPartitioner returns a Partitioner which walks through the available partitions one at a time.

                                                                                                                                                                                                                                                                                                                                                                                                      type PartitionerConstructor

                                                                                                                                                                                                                                                                                                                                                                                                      type PartitionerConstructor func(topic string) Partitioner

                                                                                                                                                                                                                                                                                                                                                                                                        PartitionerConstructor is the type for a function capable of constructing new Partitioners.

                                                                                                                                                                                                                                                                                                                                                                                                        func NewCustomHashPartitioner

                                                                                                                                                                                                                                                                                                                                                                                                        func NewCustomHashPartitioner(hasher func() hash.Hash32) PartitionerConstructor

                                                                                                                                                                                                                                                                                                                                                                                                          NewCustomHashPartitioner is a wrapper around NewHashPartitioner, allowing the use of custom hasher. The argument is a function providing the instance, implementing the hash.Hash32 interface. This is to ensure that each partition dispatcher gets its own hasher, to avoid concurrency issues by sharing an instance.

                                                                                                                                                                                                                                                                                                                                                                                                          func NewCustomPartitioner

                                                                                                                                                                                                                                                                                                                                                                                                          func NewCustomPartitioner(options ...HashPartitionerOption) PartitionerConstructor

                                                                                                                                                                                                                                                                                                                                                                                                            NewCustomPartitioner creates a default Partitioner but lets you specify the behavior of each component via options

                                                                                                                                                                                                                                                                                                                                                                                                            type ProduceRequest

                                                                                                                                                                                                                                                                                                                                                                                                            type ProduceRequest struct {
                                                                                                                                                                                                                                                                                                                                                                                                            	TransactionalID *string
                                                                                                                                                                                                                                                                                                                                                                                                            	RequiredAcks    RequiredAcks
                                                                                                                                                                                                                                                                                                                                                                                                            	Timeout         int32
                                                                                                                                                                                                                                                                                                                                                                                                            	Version         int16 // v1 requires Kafka 0.9, v2 requires Kafka 0.10, v3 requires Kafka 0.11
                                                                                                                                                                                                                                                                                                                                                                                                            	// contains filtered or unexported fields
                                                                                                                                                                                                                                                                                                                                                                                                            }

                                                                                                                                                                                                                                                                                                                                                                                                            func (*ProduceRequest) AddBatch

                                                                                                                                                                                                                                                                                                                                                                                                            func (r *ProduceRequest) AddBatch(topic string, partition int32, batch *RecordBatch)

                                                                                                                                                                                                                                                                                                                                                                                                            func (*ProduceRequest) AddMessage

                                                                                                                                                                                                                                                                                                                                                                                                            func (r *ProduceRequest) AddMessage(topic string, partition int32, msg *Message)

                                                                                                                                                                                                                                                                                                                                                                                                            func (*ProduceRequest) AddSet

                                                                                                                                                                                                                                                                                                                                                                                                            func (r *ProduceRequest) AddSet(topic string, partition int32, set *MessageSet)

                                                                                                                                                                                                                                                                                                                                                                                                            type ProduceResponse

                                                                                                                                                                                                                                                                                                                                                                                                            type ProduceResponse struct {
                                                                                                                                                                                                                                                                                                                                                                                                            	Blocks       map[string]map[int32]*ProduceResponseBlock // v0, responses
                                                                                                                                                                                                                                                                                                                                                                                                            	Version      int16
                                                                                                                                                                                                                                                                                                                                                                                                            	ThrottleTime time.Duration // v1, throttle_time_ms
                                                                                                                                                                                                                                                                                                                                                                                                            }

                                                                                                                                                                                                                                                                                                                                                                                                            func (*ProduceResponse) AddTopicPartition

                                                                                                                                                                                                                                                                                                                                                                                                            func (r *ProduceResponse) AddTopicPartition(topic string, partition int32, err KError)

                                                                                                                                                                                                                                                                                                                                                                                                            func (*ProduceResponse) GetBlock

                                                                                                                                                                                                                                                                                                                                                                                                            func (r *ProduceResponse) GetBlock(topic string, partition int32) *ProduceResponseBlock

                                                                                                                                                                                                                                                                                                                                                                                                            type ProduceResponseBlock

                                                                                                                                                                                                                                                                                                                                                                                                            type ProduceResponseBlock struct {
                                                                                                                                                                                                                                                                                                                                                                                                            	Err         KError    // v0, error_code
                                                                                                                                                                                                                                                                                                                                                                                                            	Offset      int64     // v0, base_offset
                                                                                                                                                                                                                                                                                                                                                                                                            	Timestamp   time.Time // v2, log_append_time, and the broker is configured with `LogAppendTime`
                                                                                                                                                                                                                                                                                                                                                                                                            	StartOffset int64     // v5, log_start_offset
                                                                                                                                                                                                                                                                                                                                                                                                            }

                                                                                                                                                                                                                                                                                                                                                                                                              partition_responses in protocol

                                                                                                                                                                                                                                                                                                                                                                                                              type ProducerError

                                                                                                                                                                                                                                                                                                                                                                                                              type ProducerError struct {
                                                                                                                                                                                                                                                                                                                                                                                                              	Msg *ProducerMessage
                                                                                                                                                                                                                                                                                                                                                                                                              	Err error
                                                                                                                                                                                                                                                                                                                                                                                                              }

                                                                                                                                                                                                                                                                                                                                                                                                                ProducerError is the type of error generated when the producer fails to deliver a message. It contains the original ProducerMessage as well as the actual error value.

                                                                                                                                                                                                                                                                                                                                                                                                                func (ProducerError) Error

                                                                                                                                                                                                                                                                                                                                                                                                                func (pe ProducerError) Error() string

                                                                                                                                                                                                                                                                                                                                                                                                                func (ProducerError) Unwrap

                                                                                                                                                                                                                                                                                                                                                                                                                func (pe ProducerError) Unwrap() error

                                                                                                                                                                                                                                                                                                                                                                                                                type ProducerErrors

                                                                                                                                                                                                                                                                                                                                                                                                                type ProducerErrors []*ProducerError

                                                                                                                                                                                                                                                                                                                                                                                                                  ProducerErrors is a type that wraps a batch of "ProducerError"s and implements the Error interface. It can be returned from the Producer's Close method to avoid the need to manually drain the Errors channel when closing a producer.

                                                                                                                                                                                                                                                                                                                                                                                                                  func (ProducerErrors) Error

                                                                                                                                                                                                                                                                                                                                                                                                                  func (pe ProducerErrors) Error() string

                                                                                                                                                                                                                                                                                                                                                                                                                  type ProducerInterceptor

                                                                                                                                                                                                                                                                                                                                                                                                                  type ProducerInterceptor interface {
                                                                                                                                                                                                                                                                                                                                                                                                                  
                                                                                                                                                                                                                                                                                                                                                                                                                  	// OnSend is called when the producer message is intercepted. Please avoid
                                                                                                                                                                                                                                                                                                                                                                                                                  	// modifying the message until it's safe to do so, as this is _not_ a copy
                                                                                                                                                                                                                                                                                                                                                                                                                  	// of the message.
                                                                                                                                                                                                                                                                                                                                                                                                                  	OnSend(*ProducerMessage)
                                                                                                                                                                                                                                                                                                                                                                                                                  }

                                                                                                                                                                                                                                                                                                                                                                                                                    ProducerInterceptor allows you to intercept (and possibly mutate) the records received by the producer before they are published to the Kafka cluster. https://cwiki.apache.org/confluence/display/KAFKA/KIP-42%3A+Add+Producer+and+Consumer+Interceptors#KIP42:AddProducerandConsumerInterceptors-Motivation

                                                                                                                                                                                                                                                                                                                                                                                                                    type ProducerMessage

                                                                                                                                                                                                                                                                                                                                                                                                                    type ProducerMessage struct {
                                                                                                                                                                                                                                                                                                                                                                                                                    	Topic string // The Kafka topic for this message.
                                                                                                                                                                                                                                                                                                                                                                                                                    	// The partitioning key for this message. Pre-existing Encoders include
                                                                                                                                                                                                                                                                                                                                                                                                                    	// StringEncoder and ByteEncoder.
                                                                                                                                                                                                                                                                                                                                                                                                                    	Key Encoder
                                                                                                                                                                                                                                                                                                                                                                                                                    	// The actual message to store in Kafka. Pre-existing Encoders include
                                                                                                                                                                                                                                                                                                                                                                                                                    	// StringEncoder and ByteEncoder.
                                                                                                                                                                                                                                                                                                                                                                                                                    	Value Encoder
                                                                                                                                                                                                                                                                                                                                                                                                                    
                                                                                                                                                                                                                                                                                                                                                                                                                    	// The headers are key-value pairs that are transparently passed
                                                                                                                                                                                                                                                                                                                                                                                                                    	// by Kafka between producers and consumers.
                                                                                                                                                                                                                                                                                                                                                                                                                    	Headers []RecordHeader
                                                                                                                                                                                                                                                                                                                                                                                                                    
                                                                                                                                                                                                                                                                                                                                                                                                                    	// This field is used to hold arbitrary data you wish to include so it
                                                                                                                                                                                                                                                                                                                                                                                                                    	// will be available when receiving on the Successes and Errors channels.
                                                                                                                                                                                                                                                                                                                                                                                                                    	// Sarama completely ignores this field and is only to be used for
                                                                                                                                                                                                                                                                                                                                                                                                                    	// pass-through data.
                                                                                                                                                                                                                                                                                                                                                                                                                    	Metadata interface{}
                                                                                                                                                                                                                                                                                                                                                                                                                    
                                                                                                                                                                                                                                                                                                                                                                                                                    	// Offset is the offset of the message stored on the broker. This is only
                                                                                                                                                                                                                                                                                                                                                                                                                    	// guaranteed to be defined if the message was successfully delivered and
                                                                                                                                                                                                                                                                                                                                                                                                                    	// RequiredAcks is not NoResponse.
                                                                                                                                                                                                                                                                                                                                                                                                                    	Offset int64
                                                                                                                                                                                                                                                                                                                                                                                                                    	// Partition is the partition that the message was sent to. This is only
                                                                                                                                                                                                                                                                                                                                                                                                                    	// guaranteed to be defined if the message was successfully delivered.
                                                                                                                                                                                                                                                                                                                                                                                                                    	Partition int32
                                                                                                                                                                                                                                                                                                                                                                                                                    	// Timestamp can vary in behaviour depending on broker configuration, being
                                                                                                                                                                                                                                                                                                                                                                                                                    	// in either one of the CreateTime or LogAppendTime modes (default CreateTime),
                                                                                                                                                                                                                                                                                                                                                                                                                    	// and requiring version at least 0.10.0.
                                                                                                                                                                                                                                                                                                                                                                                                                    	//
                                                                                                                                                                                                                                                                                                                                                                                                                    	// When configured to CreateTime, the timestamp is specified by the producer
                                                                                                                                                                                                                                                                                                                                                                                                                    	// either by explicitly setting this field, or when the message is added
                                                                                                                                                                                                                                                                                                                                                                                                                    	// to a produce set.
                                                                                                                                                                                                                                                                                                                                                                                                                    	//
                                                                                                                                                                                                                                                                                                                                                                                                                    	// When configured to LogAppendTime, the timestamp assigned to the message
                                                                                                                                                                                                                                                                                                                                                                                                                    	// by the broker. This is only guaranteed to be defined if the message was
                                                                                                                                                                                                                                                                                                                                                                                                                    	// successfully delivered and RequiredAcks is not NoResponse.
                                                                                                                                                                                                                                                                                                                                                                                                                    	Timestamp time.Time
                                                                                                                                                                                                                                                                                                                                                                                                                    	// contains filtered or unexported fields
                                                                                                                                                                                                                                                                                                                                                                                                                    }

                                                                                                                                                                                                                                                                                                                                                                                                                      ProducerMessage is the collection of elements passed to the Producer in order to send a message.

                                                                                                                                                                                                                                                                                                                                                                                                                      type Record

                                                                                                                                                                                                                                                                                                                                                                                                                      type Record struct {
                                                                                                                                                                                                                                                                                                                                                                                                                      	Headers []*RecordHeader
                                                                                                                                                                                                                                                                                                                                                                                                                      
                                                                                                                                                                                                                                                                                                                                                                                                                      	Attributes     int8
                                                                                                                                                                                                                                                                                                                                                                                                                      	TimestampDelta time.Duration
                                                                                                                                                                                                                                                                                                                                                                                                                      	OffsetDelta    int64
                                                                                                                                                                                                                                                                                                                                                                                                                      	Key            []byte
                                                                                                                                                                                                                                                                                                                                                                                                                      	Value          []byte
                                                                                                                                                                                                                                                                                                                                                                                                                      	// contains filtered or unexported fields
                                                                                                                                                                                                                                                                                                                                                                                                                      }

                                                                                                                                                                                                                                                                                                                                                                                                                        Record is kafka record type

                                                                                                                                                                                                                                                                                                                                                                                                                        type RecordBatch

                                                                                                                                                                                                                                                                                                                                                                                                                        type RecordBatch struct {
                                                                                                                                                                                                                                                                                                                                                                                                                        	FirstOffset           int64
                                                                                                                                                                                                                                                                                                                                                                                                                        	PartitionLeaderEpoch  int32
                                                                                                                                                                                                                                                                                                                                                                                                                        	Version               int8
                                                                                                                                                                                                                                                                                                                                                                                                                        	Codec                 CompressionCodec
                                                                                                                                                                                                                                                                                                                                                                                                                        	CompressionLevel      int
                                                                                                                                                                                                                                                                                                                                                                                                                        	Control               bool
                                                                                                                                                                                                                                                                                                                                                                                                                        	LogAppendTime         bool
                                                                                                                                                                                                                                                                                                                                                                                                                        	LastOffsetDelta       int32
                                                                                                                                                                                                                                                                                                                                                                                                                        	FirstTimestamp        time.Time
                                                                                                                                                                                                                                                                                                                                                                                                                        	MaxTimestamp          time.Time
                                                                                                                                                                                                                                                                                                                                                                                                                        	ProducerID            int64
                                                                                                                                                                                                                                                                                                                                                                                                                        	ProducerEpoch         int16
                                                                                                                                                                                                                                                                                                                                                                                                                        	FirstSequence         int32
                                                                                                                                                                                                                                                                                                                                                                                                                        	Records               []*Record
                                                                                                                                                                                                                                                                                                                                                                                                                        	PartialTrailingRecord bool
                                                                                                                                                                                                                                                                                                                                                                                                                        	IsTransactional       bool
                                                                                                                                                                                                                                                                                                                                                                                                                        	// contains filtered or unexported fields
                                                                                                                                                                                                                                                                                                                                                                                                                        }

                                                                                                                                                                                                                                                                                                                                                                                                                        func (*RecordBatch) LastOffset

                                                                                                                                                                                                                                                                                                                                                                                                                        func (b *RecordBatch) LastOffset() int64

                                                                                                                                                                                                                                                                                                                                                                                                                        type RecordHeader

                                                                                                                                                                                                                                                                                                                                                                                                                        type RecordHeader struct {
                                                                                                                                                                                                                                                                                                                                                                                                                        	Key   []byte
                                                                                                                                                                                                                                                                                                                                                                                                                        	Value []byte
                                                                                                                                                                                                                                                                                                                                                                                                                        }

                                                                                                                                                                                                                                                                                                                                                                                                                          RecordHeader stores key and value for a record header

                                                                                                                                                                                                                                                                                                                                                                                                                          type Records

                                                                                                                                                                                                                                                                                                                                                                                                                          type Records struct {
                                                                                                                                                                                                                                                                                                                                                                                                                          	MsgSet      *MessageSet
                                                                                                                                                                                                                                                                                                                                                                                                                          	RecordBatch *RecordBatch
                                                                                                                                                                                                                                                                                                                                                                                                                          	// contains filtered or unexported fields
                                                                                                                                                                                                                                                                                                                                                                                                                          }

                                                                                                                                                                                                                                                                                                                                                                                                                            Records implements a union type containing either a RecordBatch or a legacy MessageSet.

                                                                                                                                                                                                                                                                                                                                                                                                                            type RequestNotifierFunc

                                                                                                                                                                                                                                                                                                                                                                                                                            type RequestNotifierFunc func(bytesRead, bytesWritten int)

                                                                                                                                                                                                                                                                                                                                                                                                                              RequestNotifierFunc is invoked when a mock broker processes a request successfully and will provides the number of bytes read and written.

                                                                                                                                                                                                                                                                                                                                                                                                                              type RequestResponse

                                                                                                                                                                                                                                                                                                                                                                                                                              type RequestResponse struct {
                                                                                                                                                                                                                                                                                                                                                                                                                              	Request  protocolBody
                                                                                                                                                                                                                                                                                                                                                                                                                              	Response encoder
                                                                                                                                                                                                                                                                                                                                                                                                                              }

                                                                                                                                                                                                                                                                                                                                                                                                                                RequestResponse represents a Request/Response pair processed by MockBroker.

                                                                                                                                                                                                                                                                                                                                                                                                                                type RequiredAcks

                                                                                                                                                                                                                                                                                                                                                                                                                                type RequiredAcks int16

                                                                                                                                                                                                                                                                                                                                                                                                                                  RequiredAcks is used in Produce Requests to tell the broker how many replica acknowledgements it must see before responding. Any of the constants defined here are valid. On broker versions prior to 0.8.2.0 any other positive int16 is also valid (the broker will wait for that many acknowledgements) but in 0.8.2.0 and later this will raise an exception (it has been replaced by setting the `min.isr` value in the brokers configuration).

                                                                                                                                                                                                                                                                                                                                                                                                                                  const (
                                                                                                                                                                                                                                                                                                                                                                                                                                  	// NoResponse doesn't send any response, the TCP ACK is all you get.
                                                                                                                                                                                                                                                                                                                                                                                                                                  	NoResponse RequiredAcks = 0
                                                                                                                                                                                                                                                                                                                                                                                                                                  	// WaitForLocal waits for only the local commit to succeed before responding.
                                                                                                                                                                                                                                                                                                                                                                                                                                  	WaitForLocal RequiredAcks = 1
                                                                                                                                                                                                                                                                                                                                                                                                                                  	// WaitForAll waits for all in-sync replicas to commit before responding.
                                                                                                                                                                                                                                                                                                                                                                                                                                  	// The minimum number of in-sync replicas is configured on the broker via
                                                                                                                                                                                                                                                                                                                                                                                                                                  	// the `min.insync.replicas` configuration key.
                                                                                                                                                                                                                                                                                                                                                                                                                                  	WaitForAll RequiredAcks = -1
                                                                                                                                                                                                                                                                                                                                                                                                                                  )

                                                                                                                                                                                                                                                                                                                                                                                                                                  type Resource

                                                                                                                                                                                                                                                                                                                                                                                                                                  type Resource struct {
                                                                                                                                                                                                                                                                                                                                                                                                                                  	ResourceType        AclResourceType
                                                                                                                                                                                                                                                                                                                                                                                                                                  	ResourceName        string
                                                                                                                                                                                                                                                                                                                                                                                                                                  	ResourcePatternType AclResourcePatternType
                                                                                                                                                                                                                                                                                                                                                                                                                                  }

                                                                                                                                                                                                                                                                                                                                                                                                                                    Resource holds information about acl resource type

                                                                                                                                                                                                                                                                                                                                                                                                                                    type ResourceAcls

                                                                                                                                                                                                                                                                                                                                                                                                                                    type ResourceAcls struct {
                                                                                                                                                                                                                                                                                                                                                                                                                                    	Resource
                                                                                                                                                                                                                                                                                                                                                                                                                                    	Acls []*Acl
                                                                                                                                                                                                                                                                                                                                                                                                                                    }

                                                                                                                                                                                                                                                                                                                                                                                                                                      ResourceAcls is an acl resource type

                                                                                                                                                                                                                                                                                                                                                                                                                                      type ResourceResponse

                                                                                                                                                                                                                                                                                                                                                                                                                                      type ResourceResponse struct {
                                                                                                                                                                                                                                                                                                                                                                                                                                      	ErrorCode int16
                                                                                                                                                                                                                                                                                                                                                                                                                                      	ErrorMsg  string
                                                                                                                                                                                                                                                                                                                                                                                                                                      	Type      ConfigResourceType
                                                                                                                                                                                                                                                                                                                                                                                                                                      	Name      string
                                                                                                                                                                                                                                                                                                                                                                                                                                      	Configs   []*ConfigEntry
                                                                                                                                                                                                                                                                                                                                                                                                                                      }

                                                                                                                                                                                                                                                                                                                                                                                                                                      type SASLMechanism

                                                                                                                                                                                                                                                                                                                                                                                                                                      type SASLMechanism string

                                                                                                                                                                                                                                                                                                                                                                                                                                        SASLMechanism specifies the SASL mechanism the client uses to authenticate with the broker

                                                                                                                                                                                                                                                                                                                                                                                                                                        type SCRAMClient

                                                                                                                                                                                                                                                                                                                                                                                                                                        type SCRAMClient interface {
                                                                                                                                                                                                                                                                                                                                                                                                                                        	// Begin prepares the client for the SCRAM exchange
                                                                                                                                                                                                                                                                                                                                                                                                                                        	// with the server with a user name and a password
                                                                                                                                                                                                                                                                                                                                                                                                                                        	Begin(userName, password, authzID string) error
                                                                                                                                                                                                                                                                                                                                                                                                                                        	// Step steps client through the SCRAM exchange. It is
                                                                                                                                                                                                                                                                                                                                                                                                                                        	// called repeatedly until it errors or `Done` returns true.
                                                                                                                                                                                                                                                                                                                                                                                                                                        	Step(challenge string) (response string, err error)
                                                                                                                                                                                                                                                                                                                                                                                                                                        	// Done should return true when the SCRAM conversation
                                                                                                                                                                                                                                                                                                                                                                                                                                        	// is over.
                                                                                                                                                                                                                                                                                                                                                                                                                                        	Done() bool
                                                                                                                                                                                                                                                                                                                                                                                                                                        }

                                                                                                                                                                                                                                                                                                                                                                                                                                          SCRAMClient is a an interface to a SCRAM client implementation.

                                                                                                                                                                                                                                                                                                                                                                                                                                          type SaslAuthenticateRequest

                                                                                                                                                                                                                                                                                                                                                                                                                                          type SaslAuthenticateRequest struct {
                                                                                                                                                                                                                                                                                                                                                                                                                                          	SaslAuthBytes []byte
                                                                                                                                                                                                                                                                                                                                                                                                                                          }

                                                                                                                                                                                                                                                                                                                                                                                                                                          type SaslAuthenticateResponse

                                                                                                                                                                                                                                                                                                                                                                                                                                          type SaslAuthenticateResponse struct {
                                                                                                                                                                                                                                                                                                                                                                                                                                          	Err           KError
                                                                                                                                                                                                                                                                                                                                                                                                                                          	ErrorMessage  *string
                                                                                                                                                                                                                                                                                                                                                                                                                                          	SaslAuthBytes []byte
                                                                                                                                                                                                                                                                                                                                                                                                                                          }

                                                                                                                                                                                                                                                                                                                                                                                                                                          type SaslHandshakeRequest

                                                                                                                                                                                                                                                                                                                                                                                                                                          type SaslHandshakeRequest struct {
                                                                                                                                                                                                                                                                                                                                                                                                                                          	Mechanism string
                                                                                                                                                                                                                                                                                                                                                                                                                                          	Version   int16
                                                                                                                                                                                                                                                                                                                                                                                                                                          }

                                                                                                                                                                                                                                                                                                                                                                                                                                          type SaslHandshakeResponse

                                                                                                                                                                                                                                                                                                                                                                                                                                          type SaslHandshakeResponse struct {
                                                                                                                                                                                                                                                                                                                                                                                                                                          	Err               KError
                                                                                                                                                                                                                                                                                                                                                                                                                                          	EnabledMechanisms []string
                                                                                                                                                                                                                                                                                                                                                                                                                                          }

                                                                                                                                                                                                                                                                                                                                                                                                                                          type StdLogger

                                                                                                                                                                                                                                                                                                                                                                                                                                          type StdLogger interface {
                                                                                                                                                                                                                                                                                                                                                                                                                                          	Print(v ...interface{})
                                                                                                                                                                                                                                                                                                                                                                                                                                          	Printf(format string, v ...interface{})
                                                                                                                                                                                                                                                                                                                                                                                                                                          	Println(v ...interface{})
                                                                                                                                                                                                                                                                                                                                                                                                                                          }

                                                                                                                                                                                                                                                                                                                                                                                                                                            StdLogger is used to log error messages.

                                                                                                                                                                                                                                                                                                                                                                                                                                            type StickyAssignorUserData

                                                                                                                                                                                                                                                                                                                                                                                                                                            type StickyAssignorUserData interface {
                                                                                                                                                                                                                                                                                                                                                                                                                                            	// contains filtered or unexported methods
                                                                                                                                                                                                                                                                                                                                                                                                                                            }

                                                                                                                                                                                                                                                                                                                                                                                                                                            type StickyAssignorUserDataV0

                                                                                                                                                                                                                                                                                                                                                                                                                                            type StickyAssignorUserDataV0 struct {
                                                                                                                                                                                                                                                                                                                                                                                                                                            	Topics map[string][]int32
                                                                                                                                                                                                                                                                                                                                                                                                                                            	// contains filtered or unexported fields
                                                                                                                                                                                                                                                                                                                                                                                                                                            }

                                                                                                                                                                                                                                                                                                                                                                                                                                              StickyAssignorUserDataV0 holds topic partition information for an assignment

                                                                                                                                                                                                                                                                                                                                                                                                                                              type StickyAssignorUserDataV1

                                                                                                                                                                                                                                                                                                                                                                                                                                              type StickyAssignorUserDataV1 struct {
                                                                                                                                                                                                                                                                                                                                                                                                                                              	Topics     map[string][]int32
                                                                                                                                                                                                                                                                                                                                                                                                                                              	Generation int32
                                                                                                                                                                                                                                                                                                                                                                                                                                              	// contains filtered or unexported fields
                                                                                                                                                                                                                                                                                                                                                                                                                                              }

                                                                                                                                                                                                                                                                                                                                                                                                                                                StickyAssignorUserDataV1 holds topic partition information for an assignment

                                                                                                                                                                                                                                                                                                                                                                                                                                                type StringEncoder

                                                                                                                                                                                                                                                                                                                                                                                                                                                type StringEncoder string

                                                                                                                                                                                                                                                                                                                                                                                                                                                  StringEncoder implements the Encoder interface for Go strings so that they can be used as the Key or Value in a ProducerMessage.

                                                                                                                                                                                                                                                                                                                                                                                                                                                  func (StringEncoder) Encode

                                                                                                                                                                                                                                                                                                                                                                                                                                                  func (s StringEncoder) Encode() ([]byte, error)

                                                                                                                                                                                                                                                                                                                                                                                                                                                  func (StringEncoder) Length

                                                                                                                                                                                                                                                                                                                                                                                                                                                  func (s StringEncoder) Length() int

                                                                                                                                                                                                                                                                                                                                                                                                                                                  type SyncGroupRequest

                                                                                                                                                                                                                                                                                                                                                                                                                                                  type SyncGroupRequest struct {
                                                                                                                                                                                                                                                                                                                                                                                                                                                  	GroupId          string
                                                                                                                                                                                                                                                                                                                                                                                                                                                  	GenerationId     int32
                                                                                                                                                                                                                                                                                                                                                                                                                                                  	MemberId         string
                                                                                                                                                                                                                                                                                                                                                                                                                                                  	GroupAssignments map[string][]byte
                                                                                                                                                                                                                                                                                                                                                                                                                                                  }

                                                                                                                                                                                                                                                                                                                                                                                                                                                  func (*SyncGroupRequest) AddGroupAssignment

                                                                                                                                                                                                                                                                                                                                                                                                                                                  func (r *SyncGroupRequest) AddGroupAssignment(memberId string, memberAssignment []byte)

                                                                                                                                                                                                                                                                                                                                                                                                                                                  func (*SyncGroupRequest) AddGroupAssignmentMember

                                                                                                                                                                                                                                                                                                                                                                                                                                                  func (r *SyncGroupRequest) AddGroupAssignmentMember(memberId string, memberAssignment *ConsumerGroupMemberAssignment) error

                                                                                                                                                                                                                                                                                                                                                                                                                                                  type SyncGroupResponse

                                                                                                                                                                                                                                                                                                                                                                                                                                                  type SyncGroupResponse struct {
                                                                                                                                                                                                                                                                                                                                                                                                                                                  	Err              KError
                                                                                                                                                                                                                                                                                                                                                                                                                                                  	MemberAssignment []byte
                                                                                                                                                                                                                                                                                                                                                                                                                                                  }

                                                                                                                                                                                                                                                                                                                                                                                                                                                  func (*SyncGroupResponse) GetMemberAssignment

                                                                                                                                                                                                                                                                                                                                                                                                                                                  func (r *SyncGroupResponse) GetMemberAssignment() (*ConsumerGroupMemberAssignment, error)

                                                                                                                                                                                                                                                                                                                                                                                                                                                  type SyncProducer

                                                                                                                                                                                                                                                                                                                                                                                                                                                  type SyncProducer interface {
                                                                                                                                                                                                                                                                                                                                                                                                                                                  
                                                                                                                                                                                                                                                                                                                                                                                                                                                  	// SendMessage produces a given message, and returns only when it either has
                                                                                                                                                                                                                                                                                                                                                                                                                                                  	// succeeded or failed to produce. It will return the partition and the offset
                                                                                                                                                                                                                                                                                                                                                                                                                                                  	// of the produced message, or an error if the message failed to produce.
                                                                                                                                                                                                                                                                                                                                                                                                                                                  	SendMessage(msg *ProducerMessage) (partition int32, offset int64, err error)
                                                                                                                                                                                                                                                                                                                                                                                                                                                  
                                                                                                                                                                                                                                                                                                                                                                                                                                                  	// SendMessages produces a given set of messages, and returns only when all
                                                                                                                                                                                                                                                                                                                                                                                                                                                  	// messages in the set have either succeeded or failed. Note that messages
                                                                                                                                                                                                                                                                                                                                                                                                                                                  	// can succeed and fail individually; if some succeed and some fail,
                                                                                                                                                                                                                                                                                                                                                                                                                                                  	// SendMessages will return an error.
                                                                                                                                                                                                                                                                                                                                                                                                                                                  	SendMessages(msgs []*ProducerMessage) error
                                                                                                                                                                                                                                                                                                                                                                                                                                                  
                                                                                                                                                                                                                                                                                                                                                                                                                                                  	// Close shuts down the producer and waits for any buffered messages to be
                                                                                                                                                                                                                                                                                                                                                                                                                                                  	// flushed. You must call this function before a producer object passes out of
                                                                                                                                                                                                                                                                                                                                                                                                                                                  	// scope, as it may otherwise leak memory. You must call this before calling
                                                                                                                                                                                                                                                                                                                                                                                                                                                  	// Close on the underlying client.
                                                                                                                                                                                                                                                                                                                                                                                                                                                  	Close() error
                                                                                                                                                                                                                                                                                                                                                                                                                                                  }

                                                                                                                                                                                                                                                                                                                                                                                                                                                    SyncProducer publishes Kafka messages, blocking until they have been acknowledged. It routes messages to the correct broker, refreshing metadata as appropriate, and parses responses for errors. You must call Close() on a producer to avoid leaks, it may not be garbage-collected automatically when it passes out of scope.

                                                                                                                                                                                                                                                                                                                                                                                                                                                    The SyncProducer comes with two caveats: it will generally be less efficient than the AsyncProducer, and the actual durability guarantee provided when a message is acknowledged depend on the configured value of `Producer.RequiredAcks`. There are configurations where a message acknowledged by the SyncProducer can still sometimes be lost.

                                                                                                                                                                                                                                                                                                                                                                                                                                                    For implementation reasons, the SyncProducer requires `Producer.Return.Errors` and `Producer.Return.Successes` to be set to true in its configuration.

                                                                                                                                                                                                                                                                                                                                                                                                                                                    Example

                                                                                                                                                                                                                                                                                                                                                                                                                                                      This example shows the basic usage pattern of the SyncProducer.

                                                                                                                                                                                                                                                                                                                                                                                                                                                      Output:
                                                                                                                                                                                                                                                                                                                                                                                                                                                      
                                                                                                                                                                                                                                                                                                                                                                                                                                                      

                                                                                                                                                                                                                                                                                                                                                                                                                                                      func NewSyncProducer

                                                                                                                                                                                                                                                                                                                                                                                                                                                      func NewSyncProducer(addrs []string, config *Config) (SyncProducer, error)

                                                                                                                                                                                                                                                                                                                                                                                                                                                        NewSyncProducer creates a new SyncProducer using the given broker addresses and configuration.

                                                                                                                                                                                                                                                                                                                                                                                                                                                        func NewSyncProducerFromClient

                                                                                                                                                                                                                                                                                                                                                                                                                                                        func NewSyncProducerFromClient(client Client) (SyncProducer, error)

                                                                                                                                                                                                                                                                                                                                                                                                                                                          NewSyncProducerFromClient creates a new SyncProducer using the given client. It is still necessary to call Close() on the underlying client when shutting down this producer.

                                                                                                                                                                                                                                                                                                                                                                                                                                                          type TestReporter

                                                                                                                                                                                                                                                                                                                                                                                                                                                          type TestReporter interface {
                                                                                                                                                                                                                                                                                                                                                                                                                                                          	Error(...interface{})
                                                                                                                                                                                                                                                                                                                                                                                                                                                          	Errorf(string, ...interface{})
                                                                                                                                                                                                                                                                                                                                                                                                                                                          	Fatal(...interface{})
                                                                                                                                                                                                                                                                                                                                                                                                                                                          	Fatalf(string, ...interface{})
                                                                                                                                                                                                                                                                                                                                                                                                                                                          }

                                                                                                                                                                                                                                                                                                                                                                                                                                                            TestReporter has methods matching go's testing.T to avoid importing `testing` in the main part of the library.

                                                                                                                                                                                                                                                                                                                                                                                                                                                            type Timestamp

                                                                                                                                                                                                                                                                                                                                                                                                                                                            type Timestamp struct {
                                                                                                                                                                                                                                                                                                                                                                                                                                                            	*time.Time
                                                                                                                                                                                                                                                                                                                                                                                                                                                            }

                                                                                                                                                                                                                                                                                                                                                                                                                                                            type TopicDetail

                                                                                                                                                                                                                                                                                                                                                                                                                                                            type TopicDetail struct {
                                                                                                                                                                                                                                                                                                                                                                                                                                                            	NumPartitions     int32
                                                                                                                                                                                                                                                                                                                                                                                                                                                            	ReplicationFactor int16
                                                                                                                                                                                                                                                                                                                                                                                                                                                            	ReplicaAssignment map[int32][]int32
                                                                                                                                                                                                                                                                                                                                                                                                                                                            	ConfigEntries     map[string]*string
                                                                                                                                                                                                                                                                                                                                                                                                                                                            }

                                                                                                                                                                                                                                                                                                                                                                                                                                                            type TopicError

                                                                                                                                                                                                                                                                                                                                                                                                                                                            type TopicError struct {
                                                                                                                                                                                                                                                                                                                                                                                                                                                            	Err    KError
                                                                                                                                                                                                                                                                                                                                                                                                                                                            	ErrMsg *string
                                                                                                                                                                                                                                                                                                                                                                                                                                                            }

                                                                                                                                                                                                                                                                                                                                                                                                                                                            func (*TopicError) Error

                                                                                                                                                                                                                                                                                                                                                                                                                                                            func (t *TopicError) Error() string

                                                                                                                                                                                                                                                                                                                                                                                                                                                            type TopicMetadata

                                                                                                                                                                                                                                                                                                                                                                                                                                                            type TopicMetadata struct {
                                                                                                                                                                                                                                                                                                                                                                                                                                                            	Err        KError
                                                                                                                                                                                                                                                                                                                                                                                                                                                            	Name       string
                                                                                                                                                                                                                                                                                                                                                                                                                                                            	IsInternal bool // Only valid for Version >= 1
                                                                                                                                                                                                                                                                                                                                                                                                                                                            	Partitions []*PartitionMetadata
                                                                                                                                                                                                                                                                                                                                                                                                                                                            }

                                                                                                                                                                                                                                                                                                                                                                                                                                                            type TopicPartition

                                                                                                                                                                                                                                                                                                                                                                                                                                                            type TopicPartition struct {
                                                                                                                                                                                                                                                                                                                                                                                                                                                            	Count      int32
                                                                                                                                                                                                                                                                                                                                                                                                                                                            	Assignment [][]int32
                                                                                                                                                                                                                                                                                                                                                                                                                                                            }

                                                                                                                                                                                                                                                                                                                                                                                                                                                            type TopicPartitionError

                                                                                                                                                                                                                                                                                                                                                                                                                                                            type TopicPartitionError struct {
                                                                                                                                                                                                                                                                                                                                                                                                                                                            	Err    KError
                                                                                                                                                                                                                                                                                                                                                                                                                                                            	ErrMsg *string
                                                                                                                                                                                                                                                                                                                                                                                                                                                            }

                                                                                                                                                                                                                                                                                                                                                                                                                                                            func (*TopicPartitionError) Error

                                                                                                                                                                                                                                                                                                                                                                                                                                                            func (t *TopicPartitionError) Error() string

                                                                                                                                                                                                                                                                                                                                                                                                                                                            type TxnOffsetCommitRequest

                                                                                                                                                                                                                                                                                                                                                                                                                                                            type TxnOffsetCommitRequest struct {
                                                                                                                                                                                                                                                                                                                                                                                                                                                            	TransactionalID string
                                                                                                                                                                                                                                                                                                                                                                                                                                                            	GroupID         string
                                                                                                                                                                                                                                                                                                                                                                                                                                                            	ProducerID      int64
                                                                                                                                                                                                                                                                                                                                                                                                                                                            	ProducerEpoch   int16
                                                                                                                                                                                                                                                                                                                                                                                                                                                            	Topics          map[string][]*PartitionOffsetMetadata
                                                                                                                                                                                                                                                                                                                                                                                                                                                            }

                                                                                                                                                                                                                                                                                                                                                                                                                                                            type TxnOffsetCommitResponse

                                                                                                                                                                                                                                                                                                                                                                                                                                                            type TxnOffsetCommitResponse struct {
                                                                                                                                                                                                                                                                                                                                                                                                                                                            	ThrottleTime time.Duration
                                                                                                                                                                                                                                                                                                                                                                                                                                                            	Topics       map[string][]*PartitionError
                                                                                                                                                                                                                                                                                                                                                                                                                                                            }

                                                                                                                                                                                                                                                                                                                                                                                                                                                            Source Files

                                                                                                                                                                                                                                                                                                                                                                                                                                                            Directories

                                                                                                                                                                                                                                                                                                                                                                                                                                                            Path Synopsis
                                                                                                                                                                                                                                                                                                                                                                                                                                                            examples
                                                                                                                                                                                                                                                                                                                                                                                                                                                            interceptors Module
                                                                                                                                                                                                                                                                                                                                                                                                                                                            Package mocks provides mocks that can be used for testing applications that use Sarama.
                                                                                                                                                                                                                                                                                                                                                                                                                                                            Package mocks provides mocks that can be used for testing applications that use Sarama.
                                                                                                                                                                                                                                                                                                                                                                                                                                                            tools
                                                                                                                                                                                                                                                                                                                                                                                                                                                            tls